Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131426.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131426
summary: Disallow remote enrich after lu join
area: ES|QL
type: bug
issues:
- 129372
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ public MultiClusterSpecIT(
"NullifiedJoinKeyToPurgeTheJoin",
"SortBeforeAndAfterJoin",
"SortEvalBeforeLookup",
"SortBeforeAndAfterMultipleJoinAndMvExpand"
"SortBeforeAndAfterMultipleJoinAndMvExpand",
"LookupJoinAfterTopNAndRemoteEnrich",
// Lookup join after LIMIT is not supported in CCS yet
"LookupJoinAfterLimitAndRemoteEnrich"
);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4773,3 +4773,101 @@ FROM sample_data_ts_nanos
2023-10-23T12:27:28.948123456Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
2023-10-23T12:15:03.360123456Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
;

###############################################
# LOOKUP JOIN and ENRICH
###############################################

enrichAfterLookupJoin
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;


lookupJoinAfterEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| ENRICH languages_policy ON language_code
| LOOKUP JOIN message_types_lookup ON message
;

message:keyword | language_code:keyword | language_name:keyword | type:keyword
Connected to 10.1.0.1 | 1 | English | Success
;


lookupJoinAfterRemoteEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| ENRICH _remote:languages_policy ON language_code
| LOOKUP JOIN message_types_lookup ON message
;

message:keyword | language_code:keyword | language_name:keyword | type:keyword
Connected to 10.1.0.1 | 1 | English | Success
;


lookupJoinAfterLimitAndRemoteEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LIMIT 1
| ENRICH _remote:languages_policy ON language_code
| EVAL enrich_language_name = language_name, language_code = language_code::integer
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| KEEP message, enrich_language_name, language_name, country.keyword
| SORT language_name, country.keyword
;

message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
Connected to 10.1.0.1 | English | English | Canada
Connected to 10.1.0.1 | English | English | United States of America
Connected to 10.1.0.1 | English | English | null
Connected to 10.1.0.1 | English | null | United Kingdom
;


lookupJoinAfterTopNAndRemoteEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| SORT message
| LIMIT 1
| ENRICH _remote:languages_policy ON language_code
| EVAL enrich_language_name = language_name, language_code = language_code::integer
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| KEEP message, enrich_language_name, language_name, country.keyword
| SORT language_name, country.keyword
;

message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
Connected to 10.1.0.1 | English | English | Canada
Connected to 10.1.0.1 | English | English | United States of America
Connected to 10.1.0.1 | English | English | null
Connected to 10.1.0.1 | English | null | United Kingdom
;
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -295,23 +296,43 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
* retaining the originating cluster and restructing pages for routing, which might be complicated.
*/
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
boolean[] agg = { false };
boolean[] enrichCoord = { false };
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
// in separate FORK branches which are valid by themselves.
plan.forEachUp(Enrich.class, enrich -> { checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures); });
}

/**
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
*/
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
if (enrich.mode != Mode.REMOTE) {
return;
}

// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not completely sure, but if FORK needs merging on the coordinator, then we definitely want it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does FORK need to be on coordinator always? I mean, if that's currently the case then yes, we must respect it here and provide the check for it, but if the clauses inside FORK are not coordinator-only (like STATS) in general why there should be the coordinator stage? Maybe I don't understand the semantics of it yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FORK is 9.1+ though, it may be easier to take care of it in a separate pull. And maybe even not mention FORK if we're merging it down to 8.18, otherwise somebody reading 8.18 branch code could be quite confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar enough with FORK to make a quick call, so I think it's best if that is handled separately and I opened #131445 to track it.

That said, the javadoc here implies that the nodes after a FORK all execute on the coordinator, making FORK actually a pipeline breaker.

// https://github.com/elastic/elasticsearch/issues/131445
boolean[] aggregate = { false };
boolean[] coordinatorOnlyEnrich = { false };
boolean[] lookupJoin = { false };

plan.forEachUp(UnaryPlan.class, u -> {
enrich.forEachUp(LogicalPlan.class, u -> {
if (u instanceof Aggregate) {
agg[0] = true;
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
enrichCoord[0] = true;
}
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
if (agg[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
}
if (enrichCoord[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
}
aggregate[0] = true;
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
coordinatorOnlyEnrich[0] = true;
} else if (u instanceof LookupJoin) {
lookupJoin[0] = true;
}
});

if (aggregate[0]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add new commands (FORK?) to this list, maybe we can start considering the creation of a flag interface to identify them (similar to current PipelineBreaker, but it's not exactly the same thing), and avoid the list of instanceof

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having an interface that identifies the "not remotable" components would be nice, however it may be tricky because some operations may depend on options (like Enrich itself which can be "coordinator" or "remote" or "any"). Still, it might save some effort for simpler operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. We've come across this when working on remote LOOKUP JOINs, already, see my comment here. I think we may want to refine the concept of pipeline breakers as part of improving remote ENRICH in #115897. I'll add a comment there to track these observations :)

failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
}
if (coordinatorOnlyEnrich[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
}
if (lookupJoin[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
PhysicalPlan mappedChild = map(unary.child());

//
// TODO - this is hard to follow and needs reworking
// TODO - this is hard to follow, causes bugs and needs reworking
// https://github.com/elastic/elasticsearch/issues/115897
//
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;

public final class AnalyzerTestUtils {

Expand All @@ -61,45 +61,44 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
emptyInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), verifier);
}

public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier
) {
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier,
Configuration config
) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
config,
new EsqlFunctionRegistry(),
indexResolution,
lookupResolution,
defaultEnrichResolution(),
enrichResolution,
defaultInferenceResolution()
),
verifier
);
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
return new Analyzer(
new AnalyzerContext(
config,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
defaultInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
}

public static Analyzer analyzer(Verifier verifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.parser.QueryParam;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -38,9 +39,13 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadEnrichPolicyResolution;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
Expand Down Expand Up @@ -2343,7 +2348,60 @@ public void testRemoteLookupJoinIsDisabled() {
() -> query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code")
);
assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN"));
}

public void testRemoteEnrichAfterLookupJoin() {
EnrichResolution enrichResolution = new EnrichResolution();
loadEnrichPolicyResolution(
enrichResolution,
Enrich.Mode.REMOTE,
MATCH_TYPE,
"languages",
"language_code",
"languages_idx",
"mapping-languages.json"
);
var analyzer = AnalyzerTestUtils.analyzer(
loadMapping("mapping-default.json", "test"),
defaultLookupResolution(),
enrichResolution,
TEST_VERIFIER
);

query("""
FROM test
| EVAL language_code = languages
| ENRICH _remote:languages ON language_code
| LOOKUP JOIN languages_lookup ON language_code
""", analyzer);

String err = error("""
FROM test
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));

err = error("""
FROM test
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
| LOOKUP JOIN languages_lookup ON language_code
""", analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));

err = error("""
FROM test
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| EVAL x = 1
| MV_EXPAND language_name
| DISSECT language_name "%{foo}"
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
}

private void checkFullTextFunctionsInStats(String functionInvocation) {
Expand Down
Loading