Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131286.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131286
summary: Allow remote enrich after LOOKUP JOIN
area: ES|QL
type: bug
issues:
- 129372
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,76 @@ type:keyword | language_code:integer | language_name:keyword
Production | 3 | Spanish
;

enrichAfterLookupJoin
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

# TODO: a bunch more tests, also switch orders, use double _remote enrich, double lookup join etc. Also add tests with
# _coordinator enrich. What about ROW?

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichSortAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
| SORT message ASC
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

sortRemoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| SORT message ASC
| ENRICH _remote:languages_policy ON language_code
| LIMIT 2
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

###############################################
# LOOKUP JOIN on mixed numerical fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception {
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
// Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. There's one more in CrossClusterEnrichUnavailableClustersIT btw if we already fixing it :)

// because we stopped it before processing the data
assertThat(
getValuesList(resp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,12 @@ public enum Cap {
*/
ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),

/**
* Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
* see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
*/
REMOTE_ENRICH_AFTER_LOOKUP_JOIN,

/**
* MATCH PHRASE function
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
Expand Down Expand Up @@ -102,7 +103,13 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
Holder<Boolean> hasFragment = new Holder<>(false);

// Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
// include the plan up until this ENRICH in the fragment.
var childTransformed = mappedChild.transformUp(f -> {
if (f instanceof BinaryExec be) {
// Remove any LOOKUP JOIN or inline Join from INLINE STATS do avoid double execution
return be.left();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main fix: don't duplicate the join node after a remote enrich.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think there's a saner approach. Any LOOKUP JOIN before a remote ENRICH is implicitly remote, too. We should treat them as such, otherwise we will have to redo a lot of validation and look into edge cases that are already solved for remote LOOKUP JOIN.

This approach is implemented in 0f65dd5.

// Once we reached FragmentExec, we stuff our Enrich under it
if (f instanceof FragmentExec) {
hasFragment.set(true);
Expand Down
Loading