Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131286.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131286
summary: Allow remote enrich after LOOKUP JOIN
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,76 @@ type:keyword | language_code:integer | language_name:keyword
Production | 3 | Spanish
;

enrichAfterLookupJoin
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

# TODO: a bunch more tests, also switch orders, use double _remote enrich, double lookup join etc. Also add tests with
# _coordinator enrich. What about ROW?

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichSortAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
| SORT message ASC
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

sortRemoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| SORT message ASC
| ENRICH _remote:languages_policy ON language_code
| LIMIT 2
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

###############################################
# LOOKUP JOIN on mixed numerical fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception {
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
// Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. There's one more in CrossClusterEnrichUnavailableClustersIT btw if we already fixing it :)

// because we stopped it before processing the data
assertThat(
getValuesList(resp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,12 @@ public enum Cap {
*/
ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),

/**
* Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
* see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
*/
REMOTE_ENRICH_AFTER_LOOKUP_JOIN,

/**
* MATCH PHRASE function
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) {

@Override
public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
return p -> {
return child -> {
var source = source(ctx);
Tuple<Mode, String> tuple = parsePolicyName(ctx.policyName);
Mode mode = tuple.v1();
Expand All @@ -482,9 +482,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
}

List<NamedExpression> keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class);

// If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too.
LogicalPlan updatedChild = (mode == Mode.REMOTE) == false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
Maybe we should flip the condition to avoid (==) == false?

? child
: child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true));

return new Enrich(
source,
p,
updatedChild,
mode,
Literal.keyword(source(ctx.policyName), policyNameString),
matchField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
PhysicalPlan mappedChild = map(unary.child());

//
// TODO - this is hard to follow and needs reworking
// TODO - this is hard to follow, causes bugs and needs reworking
// https://github.com/elastic/elasticsearch/issues/115897
//
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
Expand All @@ -102,6 +102,8 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
Holder<Boolean> hasFragment = new Holder<>(false);

// Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
// include the plan up until this ENRICH in the fragment.
var childTransformed = mappedChild.transformUp(f -> {
// Once we reached FragmentExec, we stuff our Enrich under it
if (f instanceof FragmentExec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;

public final class AnalyzerTestUtils {

Expand All @@ -61,45 +61,44 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
emptyInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), verifier);
}

public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier
) {
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier,
Configuration config
) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
config,
new EsqlFunctionRegistry(),
indexResolution,
lookupResolution,
defaultEnrichResolution(),
enrichResolution,
defaultInferenceResolution()
),
verifier
);
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
return new Analyzer(
new AnalyzerContext(
config,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
defaultInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
}

public static Analyzer analyzer(Verifier verifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.parser.QueryParam;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -38,9 +39,13 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadEnrichPolicyResolution;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
Expand Down Expand Up @@ -2268,6 +2273,78 @@ public void testRemoteLookupJoinWithPipelineBreaker() {
);
}

public void testRemoteEnrichAfterLookupJoinWithPipelineBreaker() {
EnrichResolution enrichResolution = new EnrichResolution();
loadEnrichPolicyResolution(
enrichResolution,
Enrich.Mode.REMOTE,
MATCH_TYPE,
"languages",
"language_code",
"languages_idx",
"mapping-languages.json"
);
loadEnrichPolicyResolution(
enrichResolution,
Enrich.Mode.COORDINATOR,
MATCH_TYPE,
"languages_coord",
"language_code",
"languages_idx",
"mapping-languages.json"
);
var analyzer = AnalyzerTestUtils.analyzer(
loadMapping("mapping-default.json", "test"),
defaultLookupResolution(),
enrichResolution,
TEST_VERIFIER
);

String err = error("""
FROM test
| STATS c = COUNT(*) by languages
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(
err,
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@2:3")
);
assertThat(err, containsString("5:3: ENRICH with remote policy can't be executed after STATS"));

err = error("""
FROM test
| SORT emp_no
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@2:3"));

err = error("""
FROM test
| LIMIT 2
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@2:3"));

err = error("""
FROM test
| EVAL language_code = languages
| ENRICH _coordinator:languages_coord
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(
err,
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@3:3")
);
assertThat(err, containsString("5:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
}

public void testRemoteLookupJoinIsSnapshot() {
// TODO: remove when we allow remote joins in release builds
assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled());
Expand All @@ -2282,7 +2359,6 @@ public void testRemoteLookupJoinIsDisabled() {
() -> query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code")
);
assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN"));

}

private void checkFullTextFunctionsInStats(String functionInvocation) {
Expand Down
Loading