Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5c6b3f4
Allow remote enrich after LOOKUP JOIN
alex-spies Jul 15, 2025
3aaf1b4
Update docs/changelog/131286.yaml
alex-spies Jul 15, 2025
4efe247
Update docs/changelog/131286.yaml
alex-spies Jul 17, 2025
0f65dd5
Make LJs remote before remote enrich
alex-spies Jul 17, 2025
6d52153
Update 131286.yaml
alex-spies Jul 18, 2025
db33e26
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Jul 25, 2025
cd57460
Small fixes
smalyshev Jul 25, 2025
f0c3d53
Update docs/changelog/131940.yaml
smalyshev Jul 25, 2025
f488b7d
Delete docs/changelog/131286.yaml
smalyshev Jul 25, 2025
75f0210
[CI] Auto commit changes from spotless
Jul 25, 2025
627ea94
Simplify
smalyshev Jul 25, 2025
8fa80a9
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Jul 28, 2025
378fc00
Remove verifier ban
smalyshev Jul 28, 2025
54245d9
[CI] Auto commit changes from spotless
Jul 28, 2025
39caa37
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Jul 29, 2025
cdebea5
fix comment
smalyshev Jul 29, 2025
66e7767
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Aug 14, 2025
9d40e10
Add tests
smalyshev Aug 14, 2025
95baad7
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Aug 18, 2025
59e02b4
Fix test
smalyshev Aug 18, 2025
baaeec6
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Aug 19, 2025
842c659
For now, all inference is happening on coordinator, so make it clear
smalyshev Aug 22, 2025
27ff9ae
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Aug 22, 2025
f9efe24
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Aug 27, 2025
516c5ee
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Sep 2, 2025
20cbdfe
Moar tests
smalyshev Sep 2, 2025
f53a301
Merge branch 'main' into remote-enrich-after-lookup-join
smalyshev Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131940.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131940
summary: Allow remote enrich after LOOKUP JOIN
area: ES|QL
type: enhancement
issues: []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case suggestion:

  • Multiple lookup joins and multiple enriches, also intertwined LJ -> ENRICH -> LJ -> ENRICH
  • Cases with several unary plans in between the LJ and the ENRICH

Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,76 @@ type:keyword | language_code:integer | language_name:keyword
Production | 3 | Spanish
;

enrichAfterLookupJoin
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

# TODO: a bunch more tests, also switch orders, use double _remote enrich, double lookup join etc. Also add tests with
# _coordinator enrich. What about ROW?

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichSortAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
| SORT message ASC
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

sortRemoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| SORT message ASC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add tests with limits in between the two. Also, multiple TopNs/Limits, maybe even silly stuff like TopN -> Limit -> TopN -> Limit (which should become a single TopN if the numbers keep going down, but not necessarily otherwise)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is lookup join suite though, not sure many TopN really the right place here, given as I can't run TopN before either remote join or remote enrich, and just running several TopN's after doesn't really have much to do with either join or enrich?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't run TopN before either remote join or remote enrich
TopN can be run before remote enrich, right? That's what the remote enrich hack enables. In fact, this very query should optimize to a LJ -> Top2 -> remote ENRICH because the limit will be pushed down.

Having multiple sorts and limits between the LJ and remote ENRICH should work, even if the query'd be a bit silly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'll see if I can add something like that.

| ENRICH _remote:languages_policy ON language_code
| LIMIT 2
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

###############################################
# LOOKUP JOIN on mixed numerical fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception {
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
// Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
// because we stopped it before processing the data
assertThat(
getValuesList(resp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;

/**
* This IT test is the dual of CrossClustersEnrichIT, which tests "happy path"
* This IT test is the dual of CrossClusterEnrichIT, which tests "happy path"
* and this one tests unavailable cluster scenarios using (most of) the same tests.
*/
public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBasedCrossClusterTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,12 @@ public enum Cap {
*/
ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),

/**
* Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
* see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
*/
REMOTE_ENRICH_AFTER_LOOKUP_JOIN,

/**
* MATCH PHRASE function
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) {

@Override
public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
return p -> {
return child -> {
var source = source(ctx);
Tuple<Mode, String> tuple = parsePolicyName(ctx.policyName);
Mode mode = tuple.v1();
Expand All @@ -484,9 +484,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
}

List<NamedExpression> keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class);

// If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too.
if (mode == Mode.REMOTE) {
child = child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true));
}

return new Enrich(
source,
p,
child,
mode,
Literal.keyword(source(ctx.policyName), policyNameString),
matchField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.io.IOException;
Expand All @@ -34,7 +33,7 @@
import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {

public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
Expand All @@ -24,7 +25,8 @@
public abstract class InferencePlan<PlanType extends InferencePlan<PlanType>> extends UnaryPlan
implements
SortAgnostic,
GeneratingPlan<InferencePlan<PlanType>> {
GeneratingPlan<InferencePlan<PlanType>>,
ExecutesOn.Coordinator {

public static final String INFERENCE_ID_OPTION_NAME = "inference_id";
public static final List<String> VALID_INFERENCE_OPTION_NAMES = List.of(INFERENCE_ID_OPTION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

Expand All @@ -38,7 +37,7 @@
import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware, ExecutesOn.Coordinator {
public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware {

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
Holder<Boolean> hasFragment = new Holder<>(false);

// Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
// include the plan up until this ENRICH in the fragment.
var childTransformed = mappedChild.transformUp(f -> {
// Once we reached FragmentExec, we stuff our Enrich under it
if (f instanceof FragmentExec) {
Expand All @@ -118,7 +120,10 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
return unaryExec.child();
}
}
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
// Here we have the following possibilities:
// 1. LeafExec - should resolve to FragmentExec or we can ignore it
// 2. Join - must be remote, and thus will go inside FragmentExec
// 3. Fork/MergeExec - not currently allowed with remote enrich
return f;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,74 @@ public void testRemoteLookupJoinWithPipelineBreaker() {
// Since FORK, RERANK, COMPLETION and CHANGE_POINT are not supported on remote indices, we can't check them here against the remote
// LOOKUP JOIN
}

public void testRemoteEnrichAfterLookupJoinWithPipelineBreaker() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests don't rely on post-optimization verification, right? Maybe it'd be a bit more fitting to have them be part of the VerifierTests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these are from postAnalysisVerification so maybe I should move them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No wait I'm wrong, only LIMIT is tested pre-optimizer, the rest is tested post-optimizer. So I could move the LIMIT one to Analyzer part but tbh I'd prefer to keep them all together.

EnrichResolution enrichResolution = new EnrichResolution();
loadEnrichPolicyResolution(
enrichResolution,
Enrich.Mode.REMOTE,
MATCH_TYPE,
"languages",
"language_code",
"languages_idx",
"mapping-languages.json"
);
loadEnrichPolicyResolution(
enrichResolution,
Enrich.Mode.COORDINATOR,
MATCH_TYPE,
"languages_coord",
"language_code",
"languages_idx",
"mapping-languages.json"
);
var analyzer = AnalyzerTestUtils.analyzer(
loadMapping("mapping-default.json", "test"),
defaultLookupResolution(),
enrichResolution,
TEST_VERIFIER
);

String err = error("""
FROM test
| STATS c = COUNT(*) by languages
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(
err,
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@2:3")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welp, the error message may have to be improved in the future. But I know that'd require more effort as the validation for remote LJ requires it to be marked as remote already before optimization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is not the most obvious result here. Not sure how to make it better - technically the message is correct, but probably doesn't explain well what's going on.

);

err = error("""
FROM test
| SORT emp_no
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@2:3"));

err = error("""
FROM test
| LIMIT 2
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@2:3"));

err = error("""
FROM test
| EVAL language_code = languages
| ENRICH _coordinator:languages_coord
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(
err,
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@3:3")
);
}
}