Skip to content

Commit 9e09c22

Browse files
authored
ESQL: More care when pushing expression to load (elastic#139412)
With this change we only push if the EVAL/WHERE/STATS has exactly one "leaf" node ancestor *AND* it's valid for pushing. This prevents pushing into "StubRelations" like we get with this plan: ``` FROM k8s-downsampled | INLINE STATS tx_max = MAX(network.eth0.tx) BY pod ``` In the future we should be able to push if we can push into *all* relations rather than checking that there is just one. But baby steps. Closes elastic#138620
1 parent a28ac9e commit 9e09c22

File tree

3 files changed

+110
-55
lines changed

3 files changed

+110
-55
lines changed

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -391,12 +391,6 @@ tests:
391391
- class: org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT
392392
method: testWaitForSnapshot
393393
issue: https://github.com/elastic/elasticsearch/issues/138669
394-
- class: org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizerTests
395-
method: testAggregateMetricDoubleInlineStats
396-
issue: https://github.com/elastic/elasticsearch/issues/138620
397-
- class: org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceDateTruncBucketWithRoundToTests
398-
method: testAggregateMetricDoubleInlineStats
399-
issue: https://github.com/elastic/elasticsearch/issues/138620
400394
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
401395
method: test {yaml=index/100_field_name_length_limit/Test field name length limit}
402396
issue: https://github.com/elastic/elasticsearch/issues/138768

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java

Lines changed: 91 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@
2424
import org.elasticsearch.xpack.esql.plan.logical.Filter;
2525
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2626
import org.elasticsearch.xpack.esql.plan.logical.Project;
27+
import org.elasticsearch.xpack.esql.plan.logical.Row;
28+
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
2729
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
2830
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
2931

3032
import java.util.ArrayList;
3133
import java.util.HashMap;
34+
import java.util.IdentityHashMap;
3235
import java.util.List;
3336
import java.util.Map;
3437

@@ -74,25 +77,98 @@ public class PushExpressionsToFieldLoad extends ParameterizedRule<LogicalPlan, L
7477

7578
@Override
7679
public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext context) {
77-
Rule rule = new Rule(context, plan);
80+
Rule rule = new Rule(context);
7881
return plan.transformDown(LogicalPlan.class, rule::doRule);
7982
}
8083

84+
/**
85+
* Lazily scans the plan for "primaries". A "primary" here is an {@link EsRelation}
86+
* we can push a field load into.
87+
* <p>
88+
* Every node in the plan in the plan can be traced "down" to a leaf - the source
89+
* of all of its data. This rule can only push expressions into {@link EsRelation},
90+
* but there are lots of other kinds of leaf nodes like {@link StubRelation} and
91+
* {@link Row}. If a node has any of those unsupported ancestors then {@link #primariesFor}
92+
* will return an empty {@link List}. This is the signal the rest of the code uses
93+
* for "can't push".
94+
* </p>
95+
*/
96+
private class Primaries {
97+
/**
98+
* A map from each node to all of its "primaries". The empty list is special here - it means that the node's
99+
* parent doesn't support pushing.
100+
* <p>
101+
* Note: The primary itself will be in the map, pointing to itself.
102+
* </p>
103+
*/
104+
private Map<LogicalPlan, List<EsRelation>> primaries = new IdentityHashMap<>();
105+
106+
/**
107+
* Find "primaries" for a node. Returning the empty list is special here - it
108+
* means that the node's ancestors contain a node to which we cannot push.
109+
*/
110+
List<EsRelation> primariesFor(LogicalPlan plan) {
111+
scanSubtree(plan);
112+
return primaries.get(plan);
113+
}
114+
115+
/**
116+
* Recursively scan the tree under {@code plan}, visiting ancestors
117+
* before children, and ignoring any trees we've scanned before.
118+
*/
119+
private void scanSubtree(LogicalPlan plan) {
120+
if (primaries.containsKey(plan)) {
121+
return;
122+
}
123+
if (plan.children().isEmpty()) {
124+
onLeaf(plan);
125+
} else {
126+
for (LogicalPlan child : plan.children()) {
127+
scanSubtree(child);
128+
}
129+
onInner(plan);
130+
}
131+
}
132+
133+
private void onLeaf(LogicalPlan plan) {
134+
if (plan instanceof EsRelation rel) {
135+
if (rel.indexMode() == IndexMode.LOOKUP) {
136+
primaries.put(plan, List.of());
137+
} else {
138+
primaries.put(rel, List.of(rel));
139+
}
140+
} else {
141+
primaries.put(plan, List.of());
142+
}
143+
}
144+
145+
private void onInner(LogicalPlan plan) {
146+
List<EsRelation> result = new ArrayList<>(plan.children().size());
147+
for (LogicalPlan child : plan.children()) {
148+
List<EsRelation> childPrimaries = primaries.get(child);
149+
assert childPrimaries != null : "scanned depth first " + child;
150+
if (childPrimaries.isEmpty()) {
151+
log.trace("{} unsupported primaries {}", plan, child);
152+
primaries.put(plan, List.of());
153+
return;
154+
}
155+
result.addAll(childPrimaries);
156+
}
157+
log.trace("{} primaries {}", plan, result);
158+
primaries.put(plan, result);
159+
}
160+
}
161+
81162
private class Rule {
82163
private final Map<Attribute.IdIgnoringWrapper, Attribute> addedAttrs = new HashMap<>();
83164

84165
private final LocalLogicalOptimizerContext context;
85-
private final LogicalPlan plan;
166+
private final Primaries primaries = new Primaries();
86167

87-
/**
88-
* The primary indices, lazily initialized.
89-
*/
90-
private List<EsRelation> primaries;
91168
private boolean addedNewAttribute = false;
92169

93-
private Rule(LocalLogicalOptimizerContext context, LogicalPlan plan) {
170+
private Rule(LocalLogicalOptimizerContext context) {
94171
this.context = context;
95-
this.plan = plan;
96172
}
97173

98174
private LogicalPlan doRule(LogicalPlan plan) {
@@ -115,7 +191,7 @@ private LogicalPlan doRule(LogicalPlan plan) {
115191
private LogicalPlan transformPotentialInvocation(LogicalPlan plan) {
116192
LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> {
117193
if (e instanceof BlockLoaderExpression ble) {
118-
return transformExpression(e, ble);
194+
return transformExpression(plan, e, ble);
119195
}
120196
return e;
121197
});
@@ -130,12 +206,16 @@ private LogicalPlan transformPotentialInvocation(LogicalPlan plan) {
130206
return new EsqlProject(Source.EMPTY, transformedPlan, transformedPlan.output());
131207
}
132208

133-
private Expression transformExpression(Expression e, BlockLoaderExpression ble) {
209+
private Expression transformExpression(LogicalPlan nodeWithExpression, Expression e, BlockLoaderExpression ble) {
134210
BlockLoaderExpression.PushedBlockLoaderExpression fuse = ble.tryPushToFieldLoading(context.searchStats());
135211
if (fuse == null) {
136212
return e;
137213
}
138-
if (anyPrimaryContains(fuse.field()) == false) {
214+
List<EsRelation> planPrimaries = primaries.primariesFor(nodeWithExpression);
215+
log.trace("found primaries {} {}", nodeWithExpression, planPrimaries);
216+
if (planPrimaries.size() != 1) {
217+
// Empty list means that we can't push.
218+
// >1 primary is currently unsupported, though we expect to support it later.
139219
return e;
140220
}
141221
var preference = context.configuration().pragmas().fieldExtractPreference();
@@ -184,26 +264,5 @@ private Expression replaceFieldsForFieldTransformations(Expression e, BlockLoade
184264
addedAttrs.put(key, newFunctionAttr);
185265
return newFunctionAttr;
186266
}
187-
188-
private List<EsRelation> primaries() {
189-
if (primaries == null) {
190-
primaries = new ArrayList<>(2);
191-
plan.forEachUp(EsRelation.class, r -> {
192-
if (r.indexMode() != IndexMode.LOOKUP) {
193-
primaries.add(r);
194-
}
195-
});
196-
}
197-
return primaries;
198-
}
199-
200-
private boolean anyPrimaryContains(FieldAttribute attr) {
201-
for (EsRelation primary : primaries()) {
202-
if (primary.outputSet().contains(attr)) {
203-
return true;
204-
}
205-
}
206-
return false;
207-
}
208267
}
209268
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
4545
import org.elasticsearch.xpack.esql.expression.function.fulltext.SingleFieldFullTextFunction;
4646
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
47+
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromAggregateMetricDouble;
4748
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
4849
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Length;
4950
import org.elasticsearch.xpack.esql.expression.function.scalar.string.StartsWith;
@@ -1438,8 +1439,11 @@ public void testAggregateMetricDoubleTSCommand() {
14381439
assertTrue(esRelation.output().contains(sumfieldAttr));
14391440
}
14401441

1442+
/**
1443+
* Proves that we do <strong>not</strong> push to a stub-relation even if it contains
1444+
* a field we could otherwise push. Stub relations don't have a "push" concept.
1445+
*/
14411446
public void testAggregateMetricDoubleInlineStats() {
1442-
// TODO: modify below when we handle fusing and StubRelations properly
14431447
assumeTrue("requires push", EsqlCapabilities.Cap.VECTOR_SIMILARITY_FUNCTIONS_PUSHDOWN.isEnabled());
14441448
String query = """
14451449
FROM k8s-downsampled
@@ -1451,37 +1455,35 @@ public void testAggregateMetricDoubleInlineStats() {
14511455

14521456
LogicalPlan plan = localPlan(plan(query, tsAnalyzer), new EsqlTestUtils.TestSearchStats());
14531457

1454-
// EsqlProject[[@timestamp{f}#15, cluster{f}#16, pod{f}#17, network.eth0.tx{f}#34, tx_max{r}#5]]
1458+
// EsqlProject[[@timestamp{f}#972, cluster{f}#973, pod{f}#974, network.eth0.tx{f}#991, tx_max{r}#962]]
14551459
var project = as(plan, Project.class);
14561460
assertThat(Expressions.names(project.projections()), contains("@timestamp", "cluster", "pod", "network.eth0.tx", "tx_max"));
1457-
// TopN[[Order[@timestamp{f}#15,ASC,LAST], Order[cluster{f}#16,ASC,LAST], Order[pod{f}#17,ASC,LAST]],9[INTEGER],false]
1461+
// TopN[[Order[@timestamp{f}#972,ASC,LAST], Order[cluster{f}#973,ASC,LAST], Order[pod{f}#974,ASC,LAST]],9[INTEGER],false]
14581462
var topN = as(project.child(), TopN.class);
1459-
// InlineJoin[LEFT,[pod{f}#17],[pod{r}#17]]
1463+
// InlineJoin[LEFT,[pod{f}#974],[pod{r}#974]]
14601464
var inlineJoin = as(topN.child(), InlineJoin.class);
1461-
// Aggregate[[pod{f}#17],[MAX($$MAX(network.eth>$MAX$0{r$}#39,true[BOOLEAN],PT0S[TIME_DURATION]) AS tx_max#5, pod{f}#17]]
1465+
// Aggregate[[pod{f}#974],[MAX($$MAX(network.eth>$MAX$0{r$}#996,true[BOOLEAN],PT0S[TIME_DURATION]) AS tx_max#962, pod{f}#974
14621466
var aggregate = as(inlineJoin.right(), Aggregate.class);
14631467
assertThat(aggregate.groupings(), hasSize(1));
14641468
assertThat(aggregate.aggregates(), hasSize(2));
14651469
as(Alias.unwrap(aggregate.aggregates().get(0)), Max.class);
1466-
// Eval[[$$network.eth0.tx$AMD_MAX$1489455250{f$}#40 AS $$MAX(network.eth>$MAX$0#39]]
1470+
1471+
// Eval[[FROMAGGREGATEMETRICDOUBLE(network.eth0.tx{f}#991,1[INTEGER]) AS $$MAX(network.eth>$MAX$0#996]]
14671472
var eval = as(aggregate.child(), Eval.class);
14681473
assertThat(eval.fields(), hasSize(1));
1469-
14701474
var alias = as(eval.fields().getFirst(), Alias.class);
1471-
var fieldAttr = as(alias.child(), FieldAttribute.class);
1475+
var load = as(alias.child(), FromAggregateMetricDouble.class); // <--- no pushing.
1476+
var fieldAttr = as(load.field(), FieldAttribute.class);
14721477
assertThat(fieldAttr.fieldName().string(), equalTo("network.eth0.tx"));
1473-
var field = as(fieldAttr.field(), FunctionEsField.class);
1474-
var blockLoaderFunctionConfig = as(field.functionConfig(), BlockLoaderFunctionConfig.JustFunction.class);
1475-
assertThat(blockLoaderFunctionConfig.function(), equalTo(BlockLoaderFunctionConfig.Function.AMD_MAX));
1478+
as(fieldAttr.field(), EsField.class);
14761479

1477-
// TODO: modify this comment when unmuting test
1478-
// StubRelation[[@timestamp{f}#15, ..., cluster{f}#16, ..., network.eth0.tx{f}#34, ...,
1479-
// pod{f}#17, ..., $$MAX(network.eth>$MAX$0{r$}#39, $$network.eth0.tx$AMD_MAX$1489455250{f$}#40]]
1480+
// StubRelation[[@timestamp{f}#972, ... ]]
14801481
var stubRelation = as(eval.child(), StubRelation.class);
1481-
assertFalse(stubRelation.output().contains(fieldAttr));
1482-
// EsRelation[k8s-downsampled][@timestamp{f}#15, client.ip{f}#19, cluster{f}#16, e..]
1482+
assertThat(stubRelation.output(), hasItem(fieldAttr));
1483+
1484+
// EsRelation[k8s-downsampled][@timestamp{f}#972, client.ip{f}#976, cluster{f}#973, ..]
14831485
var esRelation = as(inlineJoin.left(), EsRelation.class);
1484-
assertTrue(esRelation.output().contains(fieldAttr));
1486+
assertThat(esRelation.output(), hasItem(fieldAttr));
14851487
}
14861488

14871489
public void testVectorFunctionsWhenFieldMissing() {

0 commit comments

Comments
 (0)