Skip to content

Commit d68b945

Browse files
authored
[9.2] Replace remote Enrich hack with proper handling of remote Enrich (elastic#134967) (elastic#136901)
* Replace remote Enrich hack with proper handling of remote Enrich (elastic#134967) - The remote Enrich hack and the verification workaround that it necessitates are removed - Limit with remote Enrich is handled by duplicating the limits around Enrich - TopN with remote Enrich is handled by duplicating around Enrich and adjusting projections - Introduced `CardinalityPreserving` interface to mark nodes that preserve cardinality - Introduced the capability to run optimizations only on coordinator plan (`CoordinatorOnly` interface) - Added post-optimizer check that ensures remote Enrich is not moved to the coordinator plan (cherry picked from commit fd6e962) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java * fix merge
1 parent 701ffbb commit d68b945

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1385
-213
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,20 @@ public static Limit asLimit(Object node, Integer limitLiteral, Boolean duplicate
582582
return limit;
583583
}
584584

585+
public static Limit asLimit(Object node, Integer limitLiteral, Boolean duplicated, Boolean local) {
586+
Limit limit = as(node, Limit.class);
587+
if (limitLiteral != null) {
588+
assertEquals(as(limit.limit(), Literal.class).value(), limitLiteral);
589+
}
590+
if (duplicated != null) {
591+
assertEquals(limit.duplicated(), duplicated);
592+
}
593+
if (local != null) {
594+
assertEquals(limit.local(), local);
595+
}
596+
return limit;
597+
}
598+
585599
public static Map<String, EsField> loadMapping(String name) {
586600
return LoadMapping.loadMapping(name);
587601
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,61 @@ public void testTopNThenEnrichRemote() {
402402
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
403403
assertCCSExecutionInfoDetails(executionInfo);
404404
}
405+
406+
// No renames, no KEEP - this is required to verify that ENRICH does not break sort with fields it overrides
407+
query = """
408+
FROM *:events,events
409+
| eval ip= TO_STR(host)
410+
| SORT timestamp, user, ip
411+
| LIMIT 5
412+
| ENRICH _remote:hosts
413+
""";
414+
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
415+
assertThat(
416+
getValuesList(resp),
417+
equalTo(
418+
List.of(
419+
List.of("192.168.1.2", 1L, "andres", "192.168.1.2", "Windows"),
420+
List.of("192.168.1.3", 1L, "matthew", "192.168.1.3", "MacOS"),
421+
Arrays.asList("192.168.1.25", 1L, "park", (String) null, (String) null),
422+
List.of("192.168.1.5", 2L, "akio", "192.168.1.5", "Android"),
423+
List.of("192.168.1.6", 2L, "sergio", "192.168.1.6", "iOS")
424+
)
425+
)
426+
);
427+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
428+
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
429+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
430+
assertCCSExecutionInfoDetails(executionInfo);
431+
}
432+
}
433+
434+
public void testLimitWithCardinalityChange() {
435+
String query = String.format(Locale.ROOT, """
436+
FROM *:events,events
437+
| eval ip= TO_STR(host)
438+
| LIMIT 10
439+
| WHERE user != "andres"
440+
| %s
441+
""", enrichHosts(Enrich.Mode.REMOTE));
442+
// This is currently not supported, because WHERE is not cardinality preserving
443+
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
444+
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [LIMIT 10]@3:3"));
445+
}
446+
447+
public void testTopNTwiceThenEnrichRemote() {
448+
String query = String.format(Locale.ROOT, """
449+
FROM *:events,events
450+
| eval ip= TO_STR(host)
451+
| SORT timestamp
452+
| LIMIT 9
453+
| SORT ip, user
454+
| LIMIT 5
455+
| ENRICH _remote:hosts
456+
""", enrichHosts(Enrich.Mode.REMOTE));
457+
// This is currently not supported, because we can not handle double topN with remote enrich
458+
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
459+
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [SORT timestamp]"));
405460
}
406461

407462
public void testLimitThenEnrichRemote() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/PostOptimizationVerificationAware.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
*/
1616
public interface PostOptimizationVerificationAware {
1717

18+
/**
19+
* Marker interface for verifiers that should only be run on Coordinator
20+
*/
21+
interface CoordinatorOnly extends PostOptimizationVerificationAware {}
22+
1823
/**
1924
* Validates the implementing expression - discovered failures are reported to the given
2025
* {@link Failures} class.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
1111
import org.elasticsearch.xpack.esql.common.Failures;
12-
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation;
13-
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval;
12+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
1413
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveRegexMatch;
1514
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics;
1615
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull;
1716
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferNonNullAggConstraint;
18-
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.LocalPropagateEmptyRelation;
1917
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceDateTruncBucketWithRoundTo;
2018
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceFieldWithConstantOrNull;
2119
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNWithLimitAndSort;
@@ -24,6 +22,7 @@
2422
import org.elasticsearch.xpack.esql.rule.Rule;
2523

2624
import java.util.ArrayList;
25+
import java.util.Arrays;
2726
import java.util.List;
2827

2928
import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
@@ -34,12 +33,12 @@
3433
* This class is part of the planner. Data node level logical optimizations. At this point we have access to
3534
* {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index.
3635
*
37-
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)}
36+
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()}
3837
* and {@link LogicalPlanOptimizer#cleanup()}
3938
*/
4039
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {
4140

42-
private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;
41+
private final LogicalVerifier verifier = LogicalVerifier.LOCAL_INSTANCE;
4342

4443
private static final List<Batch<LogicalPlan>> RULES = arrayAsArrayList(
4544
new Batch<>(
@@ -53,7 +52,7 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<Logical
5352
new ReplaceDateTruncBucketWithRoundTo()
5453
),
5554
localOperators(),
56-
cleanup()
55+
localCleanup()
5756
);
5857

5958
public LocalLogicalPlanOptimizer(LocalLogicalOptimizerContext localLogicalOptimizerContext) {
@@ -67,30 +66,38 @@ protected List<Batch<LogicalPlan>> batches() {
6766

6867
@SuppressWarnings("unchecked")
6968
private static Batch<LogicalPlan> localOperators() {
70-
var operators = operators(true);
71-
var rules = operators.rules();
72-
List<Rule<?, LogicalPlan>> newRules = new ArrayList<>(rules.length);
69+
return localBatch(operators(), new ReplaceStringCasingWithInsensitiveRegexMatch());
70+
}
71+
72+
@SuppressWarnings("unchecked")
73+
private static Batch<LogicalPlan> localCleanup() {
74+
return localBatch(cleanup());
75+
}
76+
77+
@SuppressWarnings("unchecked")
78+
private static Batch<LogicalPlan> localBatch(Batch<LogicalPlan> batch, Rule<?, LogicalPlan>... additionalRules) {
79+
Rule<?, LogicalPlan>[] rules = batch.rules();
7380

74-
// apply updates to existing rules that have different applicability locally
75-
for (var r : rules) {
76-
switch (r) {
77-
case PropagateEmptyRelation ignoredPropagate -> newRules.add(new LocalPropagateEmptyRelation());
78-
// skip it: once a fragment contains an Agg, this can no longer be pruned, which the rule can do
79-
case ReplaceStatsFilteredAggWithEval ignoredReplace -> {
81+
List<Rule<?, LogicalPlan>> newRules = new ArrayList<>(rules.length);
82+
for (Rule<?, LogicalPlan> r : rules) {
83+
if (r instanceof OptimizerRules.LocalAware<?> localAware) {
84+
Rule<?, LogicalPlan> local = localAware.local();
85+
if (local != null) {
86+
newRules.add(local);
8087
}
81-
default -> newRules.add(r);
88+
} else {
89+
newRules.add(r);
8290
}
8391
}
8492

85-
// add rule that should only apply locally
86-
newRules.add(new ReplaceStringCasingWithInsensitiveRegexMatch());
93+
newRules.addAll(Arrays.asList(additionalRules));
8794

88-
return operators.with(newRules.toArray(Rule[]::new));
95+
return batch.with(newRules.toArray(Rule[]::new));
8996
}
9097

9198
public LogicalPlan localOptimize(LogicalPlan plan) {
9299
LogicalPlan optimized = execute(plan);
93-
Failures failures = verifier.verify(optimized, true, plan.output());
100+
Failures failures = verifier.verify(optimized, plan.output());
94101
if (failures.hasFailures()) {
95102
throw new VerificationException(failures);
96103
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor<Physic
3636

3737
private static final List<Batch<PhysicalPlan>> RULES = rules(true);
3838

39-
private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
39+
private final PhysicalVerifier verifier = PhysicalVerifier.LOCAL_INSTANCE;
4040

4141
public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
4242
super(context);
@@ -47,7 +47,7 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) {
4747
}
4848

4949
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
50-
Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
50+
Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes);
5151
if (failures.hasFailures()) {
5252
throw new VerificationException(failures);
5353
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineBinaryComparisons;
1717
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineDisjunctions;
1818
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineEvals;
19+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineLimitTopN;
1920
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineProjections;
2021
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ConstantFolding;
2122
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ExtractAggregateCommonFilter;
2223
import org.elasticsearch.xpack.esql.optimizer.rules.logical.FoldNull;
24+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichLimit;
25+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.HoistRemoteEnrichTopN;
2326
import org.elasticsearch.xpack.esql.optimizer.rules.logical.LiteralsOnTheRight;
2427
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PartiallyFoldCase;
2528
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation;
@@ -85,22 +88,22 @@
8588
* <li>The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example,
8689
* a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase
8790
* also applies surrogates, such as replacing an average with a sum divided by a count.</li>
88-
* <li>{@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different
91+
* <li>{@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different
8992
* things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse
9093
* time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever
9194
* possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off
9295
* after many iterations, although hitting that is considered a bug)</li>
9396
* <li>{@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN</li>
9497
* </ul>
9598
*
96-
* <p>Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
97-
* {@link LocalLogicalPlanOptimizer} layer.</p>
99+
* <p>Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied
100+
* at the {@link LocalLogicalPlanOptimizer} layer.</p>
98101
*/
99102
public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LogicalOptimizerContext> {
100103

101104
private static final List<RuleExecutor.Batch<LogicalPlan>> RULES = List.of(
102105
substitutions(),
103-
operators(false),
106+
operators(),
104107
new Batch<>("Skip Compute", new SkipQueryOnLimitZero()),
105108
cleanup(),
106109
new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized())
@@ -115,7 +118,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
115118
public LogicalPlan optimize(LogicalPlan verified) {
116119
var optimized = execute(verified);
117120

118-
Failures failures = verifier.verify(optimized, false, verified.output());
121+
Failures failures = verifier.verify(optimized, verified.output());
119122
if (failures.hasFailures()) {
120123
throw new VerificationException(failures);
121124
}
@@ -163,10 +166,11 @@ protected static Batch<LogicalPlan> substitutions() {
163166
);
164167
}
165168

166-
protected static Batch<LogicalPlan> operators(boolean local) {
169+
protected static Batch<LogicalPlan> operators() {
167170
return new Batch<>(
168171
"Operator Optimization",
169-
new CombineProjections(local),
172+
new HoistRemoteEnrichLimit(),
173+
new CombineProjections(),
170174
new CombineEvals(),
171175
new PruneEmptyPlans(),
172176
new PropagateEmptyRelation(),
@@ -212,6 +216,13 @@ protected static Batch<LogicalPlan> operators(boolean local) {
212216
}
213217

214218
protected static Batch<LogicalPlan> cleanup() {
215-
return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN(), new ReplaceRowAsLocalRelation(), new PropgateUnmappedFields());
219+
return new Batch<>(
220+
"Clean Up",
221+
new ReplaceLimitAndSortAsTopN(),
222+
new HoistRemoteEnrichTopN(),
223+
new ReplaceRowAsLocalRelation(),
224+
new PropgateUnmappedFields(),
225+
new CombineLimitTopN()
226+
);
216227
}
217228
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,29 +11,18 @@
1111
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
1212
import org.elasticsearch.xpack.esql.common.Failures;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
14-
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1514
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1615

1716
import java.util.ArrayList;
1817
import java.util.List;
1918
import java.util.function.BiConsumer;
2019

2120
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {
21+
public static final LogicalVerifier LOCAL_INSTANCE = new LogicalVerifier(true);
22+
public static final LogicalVerifier INSTANCE = new LogicalVerifier(false);
2223

23-
public static final LogicalVerifier INSTANCE = new LogicalVerifier();
24-
25-
private LogicalVerifier() {}
26-
27-
@Override
28-
boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
29-
if (skipRemoteEnrichVerification) {
30-
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
31-
var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
32-
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
33-
return true;
34-
}
35-
}
36-
return false;
24+
private LogicalVerifier(boolean isLocal) {
25+
super(isLocal);
3726
}
3827

3928
@Override
@@ -44,14 +33,16 @@ void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures
4433
PlanConsistencyChecker.checkPlan(p, depFailures);
4534

4635
if (failures.hasFailures() == false) {
47-
if (p instanceof PostOptimizationVerificationAware pova) {
36+
if (p instanceof PostOptimizationVerificationAware pova
37+
&& (pova instanceof PostOptimizationVerificationAware.CoordinatorOnly && isLocal) == false) {
4838
pova.postOptimizationVerification(failures);
4939
}
5040
if (p instanceof PostOptimizationPlanVerificationAware popva) {
5141
checkers.add(popva.postOptimizationPlanVerification());
5242
}
5343
p.forEachExpression(ex -> {
54-
if (ex instanceof PostOptimizationVerificationAware va) {
44+
if (ex instanceof PostOptimizationVerificationAware va
45+
&& (va instanceof PostOptimizationVerificationAware.CoordinatorOnly && isLocal) == false) {
5546
va.postOptimizationVerification(failures);
5647
}
5748
});

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public PhysicalPlan optimize(PhysicalPlan plan) {
3939
}
4040

4141
PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
42-
Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
42+
Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes);
4343
if (failures.hasFailures()) {
4444
throw new VerificationException(failures);
4545
}

0 commit comments

Comments
 (0)