Skip to content

Conversation

smalyshev
Copy link
Contributor

@smalyshev smalyshev commented Jul 30, 2025

Introduce ExecutedOn interface which provides the means to mark the plan as executed only on coordinator or only remotely. This will allow to generalize checks for the constructs that can not be run after coordinator-only constructs. It also allows to identify the constructs currently not allowed before remote-only constructs:

  • STATS
  • INLINESTATS
  • FORK
  • RERANK
  • COMPLETION
  • CHANGE_POINT
  • coordinator ENRICH

@smalyshev smalyshev requested review from alex-spies and Copilot and removed request for Copilot July 30, 2025 22:57
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

~~

@elastic elastic deleted a comment from Copilot AI Jul 30, 2025
@elastic elastic deleted a comment from Copilot AI Jul 30, 2025
Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very nice. I think we should go with this approach.

To be complete, I think SORT/TopN and LIMIT normally also deserve to be marked as ExecutesOn.COORDINATOR; remote ENRICH is special in that it can't deal with stuff that requires the coordinator, except topn and limit specifically - but remote LOOKUP JOIN doesn't deal well with any coordinator-only nodes, and would also profit from using this abstraction.

I think LIMITs are a very interesting case this could cover nicely; more specifically LIMITs before remote LOOKUP JOINs. They are currently considered invalid if a user puts them there, but not if our optimizations put them there - because we know that the optimization only duplicates a limit rather than pushing it fully past LOOKUP JOIN.

Marking the duplicated LIMIT as ANY while other limits are generally marked as COORDINATOR would make the validation less brittle in case that other optimizations end up putting LIMITs upstream from remote LOOKUP JOINs, or other surrogate plans end up creating LIMITs somehow.

implements
PostAnalysisPlanVerificationAware,
TelemetryAware,
PipelineBreaker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking FORK as pipeline breaker would become obsolete with the new interface, right?

Maybe the whole PipelineBreaker interface gets obsolete; or maybe not, because the original pipeline breakers (TopN, Limit, Aggs) are special in that they get "duplicated" into the fragment if they are the first node on the coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly? I think PipelineBreaker may have its uses but it's both too broad and too inexact for what we need here, that's why I want to introduce a narrow-targeted interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think introducing the narrow interface is nice :)

But Fork wasn't marked as pipeline breaker until #131945, and there we discussed that marking it as pipeline breaker could lead to confusion in other places that use that interface in a more specific way.

I'm proposing we undo that change and just mark Fork as ExecutesOn.Coordinator - marking as pipeline breaker shouldn't be required anymore,right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, it doesn't really need to be marked as breaker anymore now.

LogicalPlan p = plan;
if (plan.child() instanceof OrderBy o) {
p = new TopN(plan.source(), o.child(), o.order(), plan.limit());
p = new TopN(o.source(), o.child(), o.order(), plan.limit());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just a bug - no reason why TopN source has to come from limit, OrderBy is much better IMO.

@smalyshev smalyshev marked this pull request as ready for review August 13, 2025 07:05
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Aug 13, 2025
@smalyshev smalyshev requested a review from alex-spies August 13, 2025 07:06
Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks for improving our abstractions, I think that helps reason about the language and planning more cleanly. Nice!

Only minor comments from my side; go ahead at your own discretion, please!

PostAnalysisVerificationAware,
TelemetryAware,
SortAgnostic,
PipelineBreaker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought: Hmm, maybe PipelineBreaker should inherit from ExecutesOn.Coordinator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure yet... Need to think about it a bit more to have a good answer for this.

implements
PostAnalysisPlanVerificationAware,
TelemetryAware,
PipelineBreaker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think introducing the narrow interface is nice :)

But Fork wasn't marked as pipeline breaker until #131945, and there we discussed that marking it as pipeline breaker could lead to confusion in other places that use that interface in a more specific way.

I'm proposing we undo that change and just mark Fork as ExecutesOn.Coordinator - marking as pipeline breaker shouldn't be required anymore,right?

return; // skip myself
}
if (u instanceof Limit) {
// Limit is ok because it can be moved in by the optimizer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe comment that we check for upstream limits before optimization and thus can be sure that any limits encounter now were duplicated and have a corresponding downstream limit that ensures correctness.

@smalyshev smalyshev merged commit 37b28e4 into elastic:main Aug 14, 2025
33 checks passed
@smalyshev smalyshev deleted the executes-on branch August 14, 2025 22:15
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 15, 2025
* upstream/main: (32 commits)
  Speed up loading keyword fields with index sorts (elastic#132950)
  Mute org.elasticsearch.index.mapper.LongFieldMapperTests testSyntheticSourceWithTranslogSnapshot elastic#132964
  Simplify EsqlSession (elastic#132848)
  Implement WriteLoadConstraintDecider#canAllocate (elastic#132041)
  Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/400_synthetic_source/_doc_count} elastic#132965
  Switch to PR-based benchmark pipeline defined in ES repo (elastic#132941)
  Breakdown undesired allocations by shard routing role (elastic#132235)
  Implement v_magnitude function (elastic#132765)
  Introduce execution location marker for better handling of remote/local compatibility (elastic#132205)
  Mute org.elasticsearch.cluster.ClusterInfoServiceIT testMaxQueueLatenciesInClusterInfo elastic#132957
  Unmuting simulate index data stream mapping overrides yaml rest test (elastic#132946)
  Remove CrossClusterCancellationIT.createLocalIndex() (elastic#132952)
  Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetch elastic#132956
  Fix failing UT by adding a required capability (elastic#132947)
  Precompute the BitsetCacheKey hashCode (elastic#132875)
  Adding simulate ingest effective mapping (elastic#132833)
  Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetchMany elastic#132948
  Rename skipping logic to remove hard link to skip_unavailable (elastic#132861)
  Store ignored source in unique stored fields per entry (elastic#132142)
  Add random tests with match_only_text multi-field (elastic#132380)
  ...
joshua-adams-1 pushed a commit to joshua-adams-1/elasticsearch that referenced this pull request Aug 15, 2025
…al compatibility (elastic#132205)

* Introduce execution location marker for better handling of remote/local compatibility
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 15, 2025
…-stats

* upstream/main: (36 commits)
  Fix reproducability of builds against Java EA versions (elastic#132847)
  Speed up loading keyword fields with index sorts (elastic#132950)
  Mute org.elasticsearch.index.mapper.LongFieldMapperTests testSyntheticSourceWithTranslogSnapshot elastic#132964
  Simplify EsqlSession (elastic#132848)
  Implement WriteLoadConstraintDecider#canAllocate (elastic#132041)
  Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/400_synthetic_source/_doc_count} elastic#132965
  Switch to PR-based benchmark pipeline defined in ES repo (elastic#132941)
  Breakdown undesired allocations by shard routing role (elastic#132235)
  Implement v_magnitude function (elastic#132765)
  Introduce execution location marker for better handling of remote/local compatibility (elastic#132205)
  Mute org.elasticsearch.cluster.ClusterInfoServiceIT testMaxQueueLatenciesInClusterInfo elastic#132957
  Unmuting simulate index data stream mapping overrides yaml rest test (elastic#132946)
  Remove CrossClusterCancellationIT.createLocalIndex() (elastic#132952)
  Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetch elastic#132956
  Fix failing UT by adding a required capability (elastic#132947)
  Precompute the BitsetCacheKey hashCode (elastic#132875)
  Adding simulate ingest effective mapping (elastic#132833)
  Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetchMany elastic#132948
  Rename skipping logic to remove hard link to skip_unavailable (elastic#132861)
  Store ignored source in unique stored fields per entry (elastic#132142)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >non-issue >refactoring Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants