Skip to content

Conversation

smalyshev
Copy link
Contributor

@smalyshev smalyshev commented Sep 4, 2025

Make verifiers aware of whether we are verifying general or local plan. This can be useful in the future. It also removes the skipRemoteEnrichVerification parameter which is only needed for ad-hoc fix and is not required for the main logic.

Currently this check should never fail, but when we're going to refactor Enrich handling, we want to be sure that we don't accidentally break the semantics and perform remote enrich on the coordinator side.

Followup: add mode flag and ExecutesOn to LookupJoinExec.

@smalyshev smalyshev requested a review from alex-spies September 4, 2025 21:45
@smalyshev smalyshev marked this pull request as ready for review September 4, 2025 21:46
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Sep 4, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@smalyshev smalyshev marked this pull request as draft September 4, 2025 21:49
@smalyshev smalyshev marked this pull request as ready for review September 4, 2025 21:53
Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this makes sense! I think it's not entirely correct in case of upstream lookup joins, though, see below.

public void postPhysicalOptimizationVerification(Failures failures) {
if (mode == Enrich.Mode.REMOTE) {
// check that there is no FragmentedExec in the child plan - that would mean we're on the wrong side of the remote boundary
child().forEachDown(FragmentExec.class, f -> {
Copy link
Contributor

@alex-spies alex-spies Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya, you can't really know that but we're in the process of improving the planning of lookup joins. We made LokoupJoinExec generally contain a FragmentExec in the right hand branch. So this check is wrong in case of upstream (remote) lookup joins.

I'd add a test for this case, and I think we need to go and ignore fragments that belong to lookup joins. One way to do that is to additionally traverse the fragment and ensure its EsRelation is not in lookup mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you could extract the "leftmost leave" by calling collectLeaves and checking only the first leaf. This is the main source of the plan if the plan is made only from unary and binary execs, because our binary execs have the "special" side on the right.

I also realized there can be a MergeExec which indicates that we're after a FORK. That'd also be wrong for a remote enrich as the merge part of a fork is executed on the coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we see LokoupJoinExec aren't we on the coordinator side already anyway?

I also realized there can be a MergeExec which indicates that we're after a FORK

Can there be MergeExec without fragment? From what I'm seeing, both branches of the MergeExec eventually have fragments, so we'd still reach it anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I am starting to suspect we should never see EnrichExec with remote mode in global plan regardless of anything else (though we might in the local plan) but I am not 100% sure yet, and also not sure how do I even know is it a local or a global plan. But checking for FragmentExec seems to be a reasonable proxy for "is this a global plan?", not?

Copy link
Contributor

@alex-spies alex-spies Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we see LokoupJoinExec aren't we on the coordinator side already anyway?

No. Here's an example in our tests.

Can there be MergeExec without fragment?

I'm not super familiar with FORK/Merge but I see from Mapper.java that each branch is just planned normally. There are plans without fragments, like ROW.

Which reminds me that I don't know how we handle remote ENRICH with ROW. Pathological example, though :)

Copy link
Contributor

@alex-spies alex-spies Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But checking for FragmentExec seems to be a reasonable proxy for "is this a global plan?", not?

It used to, but we're having more complex plans now. Lookup joins contain fragments in the right branch.

I think it's more reliable to check for a local plan by walking the plan to the leaf and checking that it is a EsQueryExec. In case of joins or merges, there will be more than one leaf - but I think the best current condition is "there is exactly one EsQueryExec, and it's the leftmost leaf".

If our plans grow in complexity, we might have to explicitly mark them as data node plan, lookup index plan, coordinator plan etc. This is knowledge that is really easier to write into the plan than reverse engineer from its shape.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remote Enrich is not allowed after FORK right now, so MergeExec can not appear under remote Enrich I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this query:

FROM sample_data
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1", client_ip=to_string(client_ip)
| LOOKUP JOIN clientips_lookup ON client_ip
| ENRICH _remote:languages_policy ON language_code

and I get this plan:

LimitExec[1000[INTEGER],null]
\_ExchangeExec[[],false]
  \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Enrich[REMOTE,languages_policy[KEYWORD],language_code{r}#180,{"match":{"indices":[],"match_field":"language_code","enric
h_fields":["language_name"]}},{=.enrich-languages_policy-1750721426622},[language_name{r}#194]]
\_Limit[1000[INTEGER],true]
  \_Join[LEFT,[client_ip{r}#183],[client_ip{r}#183],[client_ip{f}#190]]
    |_Eval[[1[KEYWORD] AS language_code#180, TOSTRING(client_ip{f}#188) AS client_ip#183]]
    | \_Limit[1000[INTEGER],false]
    |   \_Filter[IN(Connected to 10.1.0.1[KEYWORD],Connected to 10.1.0.2[KEYWORD],message{f}#189)]
    |     \_EsRelation[sample_data][@timestamp{f}#186, client_ip{f}#188, event_duration..]
    \_EsRelation[clientips_lookup][LOOKUP][client_ip{f}#190, env{f}#191]<>]]

Which does not have any fragments inside the join (and in fact doesn't even have EnrichExec). And this particular verifier does not seem to be called on the local part of the plan? Is there some unmerged parts that I need to test with?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan you posted is the coordinator physical plan. There is also the data node physical plan. The physical verifier runs against both.

I checked out your branch and ran this query, then had a look at the trace logs. It shows a LookupJoinExec with a FragmentExec as right child:

ExchangeSinkExec[[@timestamp{f}#30, event_duration{f}#31, message{f}#33, language_code{r}#24, client_ip{r}#27, env{f}#35, language_name{r}#38],false]
\_ProjectExec[[@timestamp{f}#30, event_duration{f}#31, message{f}#33, language_code{r}#24, client_ip{r}#27, env{f}#35, language_name{r}#38]]
  \_FieldExtractExec[@timestamp{f}#30, event_duration{f}#31, message{f}#..]<[],[]>
    \_EnrichExec[REMOTE,match,language_code{r}#24,languages_policy,language_code,{=.enrich-languages_policy-1757401858136},[language_
name{r}#38]]
      \_LimitExec[1000[INTEGER],170]
        \_LookupJoinExec[[client_ip{r}#27],[client_ip{f}#34],[env{f}#35]]
          |_EvalExec[[1[KEYWORD] AS language_code#24, TOSTRING(client_ip{f}#32) AS client_ip#27]]
          | \_FieldExtractExec[client_ip{f}#32]<[],[]>
          |   \_EsQueryExec[sample_data], indexMode[standard], [_doc{f}#39], limit[1000], sort[] estimatedRowSize[286] queryBuilderAndTags [[QueryBuilderAndTags{queryBuilder=[{...}], tags=[]}]]
          \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
EsRelation[clientips_lookup][LOOKUP][client_ip{f}#34, env{f}#35]<>]]]]]

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like where this is going, but I think the current PR is not fully correct, yet (see below). I left a suggestion. Let me know if this works for you.

@smalyshev smalyshev changed the title Add check that remote enrich stays on remote side Refactor verifiers and add remote check Sep 9, 2025
return false;
boolean hasRemoteEnrich(LogicalPlan optimizedPlan) {
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
return enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help me understand why we only checking the first enrich?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh I am not sure. But I haven't changed that part, it's exactly the same as if were before. @bpintea may know more about this?

Copy link
Contributor

@bpintea bpintea Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.elastic.co/docs/reference/query-languages/esql/esql-cross-clusters#esql-multi-enrich states: "A _remote enrich command can’t be executed after a _coordinator enrich command."
This is checked Enrich#checkForPlansForbiddenBeforeRemoteEnrich

@smalyshev, it might be worth adding a (now missing) comment about why the [0] is chosen to be tried as a REMOTE enrich. 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be ANY though, not? ANY does not conflict with other types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing, but I think we only skip if the topmost ENRICH in the leftmost branch is remote - this is the only situation when the remote enrich hack can lead to shadowing issues that I can think of, because then the remote enrich is the top enrich node in the local plan.

We could skip more widely by looking for any remote enriches; but I think we want to fix the remote enrich hack soon, anyway, so maybe it's better to invest the effort in the real fix.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, thanks Stas!

return false;
boolean hasRemoteEnrich(LogicalPlan optimizedPlan) {
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
return enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing, but I think we only skip if the topmost ENRICH in the leftmost branch is remote - this is the only situation when the remote enrich hack can lead to shadowing issues that I can think of, because then the remote enrich is the top enrich node in the local plan.

We could skip more widely by looking for any remote enriches; but I think we want to fix the remote enrich hack soon, anyway, so maybe it's better to invest the effort in the real fix.


if (p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) {
// This check applies only for general physical plans (isLocal == false)
if (isLocal == false && p instanceof ExecutesOn ex && ex.executesOn() == ExecutesOn.ExecuteLocation.REMOTE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add a check for the converse - after physical optimization on the remote, there shouldn't be any coordinator-only nodes in the plan.


private LogicalVerifier() {}
public static LogicalVerifier getLocalVerifier() {
return new LogicalVerifier(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of re-instantiating this all the time, we could hand out a singleton. Not sure it makes a difference, though.

We could also make this methods on PostOptimizationPhasePlanVerifier as they work the same for the logical and physical verifiers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of re-instantiating this all the time, we could hand out a singleton.

+1. I guess we'll want this for both flavours (logical, physical).

Not sure it makes a difference, though.

Maybe not performance-wise, but still has a "better feel". Besides, a node can play both roles, so both types (local/coordinator) would eventually be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why the variable which it gets assigned to is not static anyway? Probably should make it so?

return new LogicalVerifier(true);
}

public static LogicalVerifier getGeneralVerifier() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so far we only have a "local" thing (verifier, mapper, optimizer etc.) and the unqualified other variant, which is the coordinator one. Let's avoid introducing a notion of a "general" thing (which wouldn't be "general", as it wouldn't apply locally :) ).
We should use "coordinator" wherever we need to denote a non-local execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, maybe "global" because it applies to the whole plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, maybe "global" because it applies to the whole plan?

My thinking is that there isn't a "global plan", executed "globally, everywhere": there's a coordinator plan and one or more (as the pipelines break) node plans, no?


private LogicalVerifier() {}
public static LogicalVerifier getLocalVerifier() {
return new LogicalVerifier(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of re-instantiating this all the time, we could hand out a singleton.

+1. I guess we'll want this for both flavours (logical, physical).

Not sure it makes a difference, though.

Maybe not performance-wise, but still has a "better feel". Besides, a node can play both roles, so both types (local/coordinator) would eventually be used.


abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);
// This is a temporary workaround to skip verification when there is a remote enrich, due to a bug
abstract boolean hasRemoteEnrich(P optimizedPlan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: hasRemoteEnrich() is a bit of a misleading name, since it's not actually a property of the verifier, but of the thing the verifier verifies. And this method's sole role is to stop/skip the verification. skipVerification() could be repurposed for other cases, but even if we'll remove it after we fix the bug[*], I'd find something like "skipOnRemoteEnrich()" (or some other variation) a bit more suggestive.

[*] "due to a bug" should contain a pointer to announced bug. We could add just a (#118531) if the URL is too long.

assertCCSExecutionInfoDetails(executionInfo);
}

// No renames, no KEEP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to make a new test out of this, possibly adding a reference to why we add this test (optional). And/or detail why KEEP is "detrimental" to the test. 🙏


public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working on micro optimizations I remember seeing some cost associated with initializing list of rules in various optimizers. Here LogicalVerifier looks much simpler, but I wonder if we could reuse a singe static object for local verifier and one for general? This would avoid creating new instances for each query execution. I acknowledge here it is much cheaper than actual rules initialization else where.

@smalyshev smalyshev marked this pull request as draft October 3, 2025 22:08
@smalyshev
Copy link
Contributor Author

This is absorbed into #134967

@smalyshev smalyshev closed this Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.3.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants