Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
Expand All @@ -30,7 +33,7 @@

import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class EnrichExec extends UnaryExec implements EstimatesRowSize {
public class EnrichExec extends UnaryExec implements EstimatesRowSize, PostPhysicalOptimizationVerificationAware {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
"EnrichExec",
Expand Down Expand Up @@ -220,4 +223,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(super.hashCode(), mode, matchType, matchField, policyName, policyMatchField, concreteIndices, enrichFields);
}

@Override
public void postPhysicalOptimizationVerification(Failures failures) {
if (mode == Enrich.Mode.REMOTE) {
// check that there is no FragmentedExec in the child plan - that would mean we're on the wrong side of the remote boundary
child().forEachDown(FragmentExec.class, f -> {
Copy link
Contributor

@alex-spies alex-spies Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya, you can't really know that but we're in the process of improving the planning of lookup joins. We made LokoupJoinExec generally contain a FragmentExec in the right hand branch. So this check is wrong in case of upstream (remote) lookup joins.

I'd add a test for this case, and I think we need to go and ignore fragments that belong to lookup joins. One way to do that is to additionally traverse the fragment and ensure its EsRelation is not in lookup mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you could extract the "leftmost leave" by calling collectLeaves and checking only the first leaf. This is the main source of the plan if the plan is made only from unary and binary execs, because our binary execs have the "special" side on the right.

I also realized there can be a MergeExec which indicates that we're after a FORK. That'd also be wrong for a remote enrich as the merge part of a fork is executed on the coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we see LokoupJoinExec aren't we on the coordinator side already anyway?

I also realized there can be a MergeExec which indicates that we're after a FORK

Can there be MergeExec without fragment? From what I'm seeing, both branches of the MergeExec eventually have fragments, so we'd still reach it anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I am starting to suspect we should never see EnrichExec with remote mode in global plan regardless of anything else (though we might in the local plan) but I am not 100% sure yet, and also not sure how do I even know is it a local or a global plan. But checking for FragmentExec seems to be a reasonable proxy for "is this a global plan?", not?

Copy link
Contributor

@alex-spies alex-spies Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we see LokoupJoinExec aren't we on the coordinator side already anyway?

No. Here's an example in our tests.

Can there be MergeExec without fragment?

I'm not super familiar with FORK/Merge but I see from Mapper.java that each branch is just planned normally. There are plans without fragments, like ROW.

Which reminds me that I don't know how we handle remote ENRICH with ROW. Pathological example, though :)

Copy link
Contributor

@alex-spies alex-spies Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But checking for FragmentExec seems to be a reasonable proxy for "is this a global plan?", not?

It used to, but we're having more complex plans now. Lookup joins contain fragments in the right branch.

I think it's more reliable to check for a local plan by walking the plan to the leaf and checking that it is a EsQueryExec. In case of joins or merges, there will be more than one leaf - but I think the best current condition is "there is exactly one EsQueryExec, and it's the leftmost leaf".

If our plans grow in complexity, we might have to explicitly mark them as data node plan, lookup index plan, coordinator plan etc. This is knowledge that is really easier to write into the plan than reverse engineer from its shape.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remote Enrich is not allowed after FORK right now, so MergeExec can not appear under remote Enrich I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this query:

FROM sample_data
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1", client_ip=to_string(client_ip)
| LOOKUP JOIN clientips_lookup ON client_ip
| ENRICH _remote:languages_policy ON language_code

and I get this plan:

LimitExec[1000[INTEGER],null]
\_ExchangeExec[[],false]
  \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Enrich[REMOTE,languages_policy[KEYWORD],language_code{r}#180,{"match":{"indices":[],"match_field":"language_code","enric
h_fields":["language_name"]}},{=.enrich-languages_policy-1750721426622},[language_name{r}#194]]
\_Limit[1000[INTEGER],true]
  \_Join[LEFT,[client_ip{r}#183],[client_ip{r}#183],[client_ip{f}#190]]
    |_Eval[[1[KEYWORD] AS language_code#180, TOSTRING(client_ip{f}#188) AS client_ip#183]]
    | \_Limit[1000[INTEGER],false]
    |   \_Filter[IN(Connected to 10.1.0.1[KEYWORD],Connected to 10.1.0.2[KEYWORD],message{f}#189)]
    |     \_EsRelation[sample_data][@timestamp{f}#186, client_ip{f}#188, event_duration..]
    \_EsRelation[clientips_lookup][LOOKUP][client_ip{f}#190, env{f}#191]<>]]

Which does not have any fragments inside the join (and in fact doesn't even have EnrichExec). And this particular verifier does not seem to be called on the local part of the plan? Is there some unmerged parts that I need to test with?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan you posted is the coordinator physical plan. There is also the data node physical plan. The physical verifier runs against both.

I checked out your branch and ran this query, then had a look at the trace logs. It shows a LookupJoinExec with a FragmentExec as right child:

ExchangeSinkExec[[@timestamp{f}#30, event_duration{f}#31, message{f}#33, language_code{r}#24, client_ip{r}#27, env{f}#35, language_name{r}#38],false]
\_ProjectExec[[@timestamp{f}#30, event_duration{f}#31, message{f}#33, language_code{r}#24, client_ip{r}#27, env{f}#35, language_name{r}#38]]
  \_FieldExtractExec[@timestamp{f}#30, event_duration{f}#31, message{f}#..]<[],[]>
    \_EnrichExec[REMOTE,match,language_code{r}#24,languages_policy,language_code,{=.enrich-languages_policy-1757401858136},[language_
name{r}#38]]
      \_LimitExec[1000[INTEGER],170]
        \_LookupJoinExec[[client_ip{r}#27],[client_ip{f}#34],[env{f}#35]]
          |_EvalExec[[1[KEYWORD] AS language_code#24, TOSTRING(client_ip{f}#32) AS client_ip#27]]
          | \_FieldExtractExec[client_ip{f}#32]<[],[]>
          |   \_EsQueryExec[sample_data], indexMode[standard], [_doc{f}#39], limit[1000], sort[] estimatedRowSize[286] queryBuilderAndTags [[QueryBuilderAndTags{queryBuilder=[{...}], tags=[]}]]
          \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
EsRelation[clientips_lookup][LOOKUP][client_ip{f}#34, env{f}#35]<>]]]]]

failures.add(Failure.fail(this, "Remote enrich cannot be performed on the coordinator side"));
});
}
}
}