Skip to content

Commit e789039

Browse files
authored
Fixing remote ENRICH by pushing the Enrich inside FragmentExec (#114665)
* Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches
1 parent b4edc3d commit e789039

File tree

5 files changed

+195
-25
lines changed

5 files changed

+195
-25
lines changed

docs/changelog/114665.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 114665
2+
summary: Fixing remote ENRICH by pushing the Enrich inside `FragmentExec`
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 105095

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Arrays;
4848
import java.util.Collection;
4949
import java.util.Collections;
50+
import java.util.Comparator;
5051
import java.util.List;
5152
import java.util.Locale;
5253
import java.util.Map;
@@ -469,27 +470,112 @@ public void testEnrichRemoteWithVendor() {
469470
}
470471
}
471472

473+
public void testEnrichRemoteWithVendorNoSort() {
474+
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
475+
Boolean requestIncludeMeta = includeCCSMetadata.v1();
476+
boolean responseExpectMeta = includeCCSMetadata.v2();
477+
478+
for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) {
479+
var query = String.format(Locale.ROOT, """
480+
FROM *:events,events
481+
| LIMIT 100
482+
| eval ip= TO_STR(host)
483+
| %s
484+
| %s
485+
| stats c = COUNT(*) by vendor
486+
""", enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE));
487+
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
488+
var values = getValuesList(resp);
489+
values.sort(Comparator.comparing(o -> (String) o.get(1), Comparator.nullsLast(Comparator.naturalOrder())));
490+
assertThat(
491+
values,
492+
equalTo(
493+
List.of(
494+
List.of(6L, "Apple"),
495+
List.of(7L, "Microsoft"),
496+
List.of(1L, "Redhat"),
497+
List.of(2L, "Samsung"),
498+
List.of(1L, "Sony"),
499+
List.of(2L, "Suse"),
500+
Arrays.asList(3L, (String) null)
501+
)
502+
)
503+
);
504+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
505+
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
506+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
507+
assertCCSExecutionInfoDetails(executionInfo);
508+
}
509+
}
510+
}
511+
472512
public void testTopNThenEnrichRemote() {
513+
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
514+
Boolean requestIncludeMeta = includeCCSMetadata.v1();
515+
boolean responseExpectMeta = includeCCSMetadata.v2();
516+
473517
String query = String.format(Locale.ROOT, """
474518
FROM *:events,events
475519
| eval ip= TO_STR(host)
476-
| SORT ip
520+
| SORT timestamp, user, ip
477521
| LIMIT 5
478-
| %s
522+
| %s | KEEP host, timestamp, user, os
479523
""", enrichHosts(Enrich.Mode.REMOTE));
480-
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
481-
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
524+
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
525+
assertThat(
526+
getValuesList(resp),
527+
equalTo(
528+
List.of(
529+
List.of("192.168.1.2", 1L, "andres", "Windows"),
530+
List.of("192.168.1.3", 1L, "matthew", "MacOS"),
531+
Arrays.asList("192.168.1.25", 1L, "park", (String) null),
532+
List.of("192.168.1.5", 2L, "akio", "Android"),
533+
List.of("192.168.1.6", 2L, "sergio", "iOS")
534+
)
535+
)
536+
);
537+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
538+
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
539+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
540+
assertCCSExecutionInfoDetails(executionInfo);
541+
}
482542
}
483543

484544
public void testLimitThenEnrichRemote() {
545+
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
546+
Boolean requestIncludeMeta = includeCCSMetadata.v1();
547+
boolean responseExpectMeta = includeCCSMetadata.v2();
548+
485549
String query = String.format(Locale.ROOT, """
486550
FROM *:events,events
487-
| LIMIT 10
551+
| LIMIT 25
488552
| eval ip= TO_STR(host)
489-
| %s
553+
| %s | KEEP host, timestamp, user, os
490554
""", enrichHosts(Enrich.Mode.REMOTE));
491-
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
492-
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
555+
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
556+
var values = getValuesList(resp);
557+
values.sort(
558+
Comparator.comparingLong((List<Object> o) -> (Long) o.get(1))
559+
.thenComparing(o -> (String) o.get(0))
560+
.thenComparing(o -> (String) o.get(2))
561+
);
562+
assertThat(
563+
values.subList(0, 5),
564+
equalTo(
565+
List.of(
566+
List.of("192.168.1.2", 1L, "andres", "Windows"),
567+
Arrays.asList("192.168.1.25", 1L, "park", (String) null),
568+
List.of("192.168.1.3", 1L, "matthew", "MacOS"),
569+
List.of("192.168.1.5", 2L, "akio", "Android"),
570+
List.of("192.168.1.5", 2L, "simon", "Android")
571+
)
572+
)
573+
);
574+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
575+
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
576+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
577+
assertCCSExecutionInfoDetails(executionInfo);
578+
}
493579
}
494580

495581
public void testAggThenEnrichRemote() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -609,22 +609,15 @@ private static void checkForSortableDataTypes(LogicalPlan p, Set<Failure> localF
609609
*/
610610
private static void checkRemoteEnrich(LogicalPlan plan, Set<Failure> failures) {
611611
boolean[] agg = { false };
612-
boolean[] limit = { false };
613612
boolean[] enrichCoord = { false };
614613

615614
plan.forEachUp(UnaryPlan.class, u -> {
616-
if (u instanceof Limit) {
617-
limit[0] = true; // TODO: Make Limit then enrich_remote work
618-
}
619615
if (u instanceof Aggregate) {
620616
agg[0] = true;
621617
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
622618
enrichCoord[0] = true;
623619
}
624620
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
625-
if (limit[0]) {
626-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LIMIT"));
627-
}
628621
if (agg[0]) {
629622
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
630623
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@
5252
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
5353
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
5454
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
55+
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
5556

5657
import java.util.List;
58+
import java.util.concurrent.atomic.AtomicBoolean;
5759

5860
/**
5961
* <p>This class is part of the planner</p>
@@ -104,6 +106,46 @@ public PhysicalPlan map(LogicalPlan p) {
104106
//
105107
// Unary Plan
106108
//
109+
if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
110+
// When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely.
111+
// We're only going to do it on the coordinator node.
112+
// The way we're going to do it is as follows:
113+
// 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything.
114+
// 2. Put this Enrich under it, removing everything that was below it previously.
115+
// 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under
116+
// FragmentExec.
117+
// 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich.
118+
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
119+
120+
var child = map(enrich.child());
121+
AtomicBoolean hasFragment = new AtomicBoolean(false);
122+
123+
var childTransformed = child.transformUp((f) -> {
124+
// Once we reached FragmentExec, we stuff our Enrich under it
125+
if (f instanceof FragmentExec) {
126+
hasFragment.set(true);
127+
return new FragmentExec(p);
128+
}
129+
if (f instanceof EnrichExec enrichExec) {
130+
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
131+
assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
132+
return enrichExec.child();
133+
}
134+
if (f instanceof UnaryExec unaryExec) {
135+
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
136+
return f;
137+
} else {
138+
return unaryExec.child();
139+
}
140+
}
141+
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
142+
return f;
143+
});
144+
145+
if (hasFragment.get()) {
146+
return childTransformed;
147+
}
148+
}
107149

108150
if (p instanceof UnaryPlan ua) {
109151
var child = map(ua.child());

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
import static org.hamcrest.Matchers.nullValue;
173173
import static org.hamcrest.Matchers.startsWith;
174174

175-
// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
175+
// @TestLogging(value = "org.elasticsearch.xpack.esql:DEBUG", reason = "debug")
176176
public class PhysicalPlanOptimizerTests extends ESTestCase {
177177

178178
private static final String PARAM_FORMATTING = "%1$s";
@@ -5851,14 +5851,14 @@ public void testEnrichBeforeLimit() {
58515851
| EVAL employee_id = to_str(emp_no)
58525852
| ENRICH _remote:departments
58535853
| LIMIT 10""");
5854-
var enrich = as(plan, EnrichExec.class);
5855-
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
5856-
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
5857-
var eval = as(enrich.child(), EvalExec.class);
5858-
var finalLimit = as(eval.child(), LimitExec.class);
5854+
var finalLimit = as(plan, LimitExec.class);
58595855
var exchange = as(finalLimit.child(), ExchangeExec.class);
58605856
var fragment = as(exchange.child(), FragmentExec.class);
5861-
var partialLimit = as(fragment.fragment(), Limit.class);
5857+
var enrich = as(fragment.fragment(), Enrich.class);
5858+
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
5859+
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
5860+
var evalFragment = as(enrich.child(), Eval.class);
5861+
var partialLimit = as(evalFragment.child(), Limit.class);
58625862
as(partialLimit.child(), EsRelation.class);
58635863
}
58645864
}
@@ -5901,13 +5901,21 @@ public void testLimitThenEnrich() {
59015901
}
59025902

59035903
public void testLimitThenEnrichRemote() {
5904-
var error = expectThrows(VerificationException.class, () -> physicalPlan("""
5904+
var plan = physicalPlan("""
59055905
FROM test
59065906
| LIMIT 10
59075907
| EVAL employee_id = to_str(emp_no)
59085908
| ENRICH _remote:departments
5909-
"""));
5910-
assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after LIMIT"));
5909+
""");
5910+
var finalLimit = as(plan, LimitExec.class);
5911+
var exchange = as(finalLimit.child(), ExchangeExec.class);
5912+
var fragment = as(exchange.child(), FragmentExec.class);
5913+
var enrich = as(fragment.fragment(), Enrich.class);
5914+
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
5915+
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
5916+
var evalFragment = as(enrich.child(), Eval.class);
5917+
var partialLimit = as(evalFragment.child(), Limit.class);
5918+
as(partialLimit.child(), EsRelation.class);
59115919
}
59125920

59135921
public void testEnrichBeforeTopN() {
@@ -5961,6 +5969,23 @@ public void testEnrichBeforeTopN() {
59615969
var eval = as(enrich.child(), Eval.class);
59625970
as(eval.child(), EsRelation.class);
59635971
}
5972+
{
5973+
var plan = physicalPlan("""
5974+
FROM test
5975+
| EVAL employee_id = to_str(emp_no)
5976+
| ENRICH _remote:departments
5977+
| SORT department
5978+
| LIMIT 10""");
5979+
var topN = as(plan, TopNExec.class);
5980+
var exchange = as(topN.child(), ExchangeExec.class);
5981+
var fragment = as(exchange.child(), FragmentExec.class);
5982+
var partialTopN = as(fragment.fragment(), TopN.class);
5983+
var enrich = as(partialTopN.child(), Enrich.class);
5984+
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
5985+
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
5986+
var eval = as(enrich.child(), Eval.class);
5987+
as(eval.child(), EsRelation.class);
5988+
}
59645989
}
59655990

59665991
public void testEnrichAfterTopN() {
@@ -6000,6 +6025,24 @@ public void testEnrichAfterTopN() {
60006025
var partialTopN = as(fragment.fragment(), TopN.class);
60016026
as(partialTopN.child(), EsRelation.class);
60026027
}
6028+
{
6029+
var plan = physicalPlan("""
6030+
FROM test
6031+
| SORT emp_no
6032+
| LIMIT 10
6033+
| EVAL employee_id = to_str(emp_no)
6034+
| ENRICH _remote:departments
6035+
""");
6036+
var topN = as(plan, TopNExec.class);
6037+
var exchange = as(topN.child(), ExchangeExec.class);
6038+
var fragment = as(exchange.child(), FragmentExec.class);
6039+
var enrich = as(fragment.fragment(), Enrich.class);
6040+
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
6041+
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
6042+
var evalFragment = as(enrich.child(), Eval.class);
6043+
var partialTopN = as(evalFragment.child(), TopN.class);
6044+
as(partialTopN.child(), EsRelation.class);
6045+
}
60036046
}
60046047

60056048
public void testManyEnrich() {

0 commit comments

Comments
 (0)