Skip to content

Commit 0d76655

Browse files
authored
Merge branch 'main' into esql-verifiers
2 parents 21845f3 + 5be4100 commit 0d76655

File tree

15 files changed

+251
-86
lines changed

15 files changed

+251
-86
lines changed

docs/changelog/129370.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 129370
2+
summary: Avoid dropping aggregate groupings in local plans
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 129811
7+
- 128054

docs/changelog/130032.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
pr: 130032
2+
summary: ES|QL cross-cluster querying is now generally available
3+
area: ES|QL
4+
type: feature
5+
issues: []
6+
highlight:
7+
title: ES|QL cross-cluster querying is now generally available
8+
body: |-
9+
The ES|QL Cross-Cluster querying feature has been in technical preview since 8.13.
10+
As of releases 8.19.0 and 9.1.0 this is now generally available.
11+
This feature allows you to run ES|QL queries across multiple clusters.
12+
notable: true

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,6 @@ tests:
476476
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
477477
method: testAvailableDiskSpaceMonitorWhenFileSystemStatErrors
478478
issue: https://github.com/elastic/elasticsearch/issues/129149
479-
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
480-
method: testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution
481-
issue: https://github.com/elastic/elasticsearch/issues/129148
482479
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
483480
method: test {lookup-join.EnrichLookupStatsBug ASYNC}
484481
issue: https://github.com/elastic/elasticsearch/issues/129228

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,7 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
847847
when(mergeTask2.schedule()).thenReturn(RUN);
848848
boolean task1Runs = randomBoolean();
849849
long currentAvailableBudget = expectedAvailableBudget.get();
850+
// the over-budget here can be larger than the total initial available budget
850851
long overBudget = randomLongBetween(currentAvailableBudget + 1L, currentAvailableBudget + 100L);
851852
long underBudget = randomLongBetween(0L, currentAvailableBudget);
852853
if (task1Runs) {
@@ -882,11 +883,18 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
882883
// update the expected budget given that one task now finished
883884
expectedAvailableBudget.set(expectedAvailableBudget.get() + completedMergeTask.estimatedRemainingMergeSize());
884885
}
885-
// let the test finish cleanly
886-
assertBusy(() -> {
887-
assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(aHasMoreSpace ? 112_500L : 103_000L));
888-
assertThat(threadPoolMergeExecutorService.allDone(), is(true));
889-
});
886+
assertBusy(
887+
() -> assertThat(
888+
threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(),
889+
is(aHasMoreSpace ? 112_500L : 103_000L)
890+
)
891+
);
892+
// let the test finish cleanly (some tasks can be over budget even if all the other tasks finished running)
893+
aFileStore.totalSpace = Long.MAX_VALUE;
894+
bFileStore.totalSpace = Long.MAX_VALUE;
895+
aFileStore.usableSpace = Long.MAX_VALUE;
896+
bFileStore.usableSpace = Long.MAX_VALUE;
897+
assertBusy(() -> assertThat(threadPoolMergeExecutorService.allDone(), is(true)));
890898
}
891899
if (setThreadPoolMergeSchedulerSetting) {
892900
assertWarnings(

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1679,6 +1679,39 @@ public void testQueryOnEmptyDataIndex() {
16791679
}
16801680
}
16811681

1682+
public void testGroupingStatsOnMissingFields() {
1683+
assertAcked(client().admin().indices().prepareCreate("missing_field_index").setMapping("data", "type=long"));
1684+
long oneValue = between(1, 1000);
1685+
indexDoc("missing_field_index", "1", "data", oneValue);
1686+
refresh("missing_field_index");
1687+
QueryPragmas pragmas = randomPragmas();
1688+
pragmas = new QueryPragmas(
1689+
Settings.builder().put(pragmas.getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
1690+
);
1691+
EsqlQueryRequest request = new EsqlQueryRequest();
1692+
request.query("FROM missing_field_index,test | STATS s = sum(data) BY color, tag | SORT color");
1693+
request.pragmas(pragmas);
1694+
try (var r = run(request)) {
1695+
var rows = getValuesList(r);
1696+
assertThat(rows, hasSize(4));
1697+
for (List<Object> row : rows) {
1698+
assertThat(row, hasSize(3));
1699+
}
1700+
assertThat(rows.get(0).get(0), equalTo(20L));
1701+
assertThat(rows.get(0).get(1), equalTo("blue"));
1702+
assertNull(rows.get(0).get(2));
1703+
assertThat(rows.get(1).get(0), equalTo(10L));
1704+
assertThat(rows.get(1).get(1), equalTo("green"));
1705+
assertNull(rows.get(1).get(2));
1706+
assertThat(rows.get(2).get(0), equalTo(30L));
1707+
assertThat(rows.get(2).get(1), equalTo("red"));
1708+
assertNull(rows.get(2).get(2));
1709+
assertThat(rows.get(3).get(0), equalTo(oneValue));
1710+
assertNull(rows.get(3).get(1));
1711+
assertNull(rows.get(3).get(2));
1712+
}
1713+
}
1714+
16821715
private void assertEmptyIndexQueries(String from) {
16831716
try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) {
16841717
assertFalse(resp.values().hasNext());
@@ -1816,6 +1849,8 @@ private void createAndPopulateIndex(String indexName, Settings additionalSetting
18161849
"time",
18171850
"type=long",
18181851
"color",
1852+
"type=keyword",
1853+
"tag",
18191854
"type=keyword"
18201855
)
18211856
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
* This class is part of the planner. Data node level logical optimizations. At this point we have access to
3131
* {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index.
3232
*
33-
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()}
33+
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)}
34+
* and {@link LogicalPlanOptimizer#cleanup()}
3435
*/
3536
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {
3637

@@ -58,8 +59,8 @@ protected List<Batch<LogicalPlan>> batches() {
5859

5960
@SuppressWarnings("unchecked")
6061
private static Batch<LogicalPlan> localOperators() {
61-
var operators = operators();
62-
var rules = operators().rules();
62+
var operators = operators(true);
63+
var rules = operators.rules();
6364
List<Rule<?, LogicalPlan>> newRules = new ArrayList<>(rules.length);
6465

6566
// apply updates to existing rules that have different applicability locally

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,22 @@
8282
* <li>The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example,
8383
* a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase
8484
* also applies surrogates, such as replacing an average with a sum divided by a count.</li>
85-
* <li>{@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different
85+
* <li>{@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different
8686
* things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse
8787
* time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever
8888
* possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off
8989
* after many iterations, although hitting that is considered a bug)</li>
9090
* <li>{@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN</li>
9191
* </ul>
9292
*
93-
* <p>Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
93+
* <p>Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
9494
* {@link LocalLogicalPlanOptimizer} layer.</p>
9595
*/
9696
public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LogicalOptimizerContext> {
9797

9898
private static final List<RuleExecutor.Batch<LogicalPlan>> RULES = List.of(
9999
substitutions(),
100-
operators(),
100+
operators(false),
101101
new Batch<>("Skip Compute", new SkipQueryOnLimitZero()),
102102
cleanup(),
103103
new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized())
@@ -160,10 +160,10 @@ protected static Batch<LogicalPlan> substitutions() {
160160
);
161161
}
162162

163-
protected static Batch<LogicalPlan> operators() {
163+
protected static Batch<LogicalPlan> operators(boolean local) {
164164
return new Batch<>(
165165
"Operator Optimization",
166-
new CombineProjections(),
166+
new CombineProjections(local),
167167
new CombineEvals(),
168168
new PruneEmptyPlans(),
169169
new PropagateEmptyRelation(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java

Lines changed: 89 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,24 @@
1818
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1919
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
2020
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
21+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2122
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2223
import org.elasticsearch.xpack.esql.plan.logical.Project;
2324
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2425

2526
import java.util.ArrayList;
27+
import java.util.HashSet;
2628
import java.util.LinkedHashSet;
2729
import java.util.List;
30+
import java.util.Set;
2831

2932
public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> {
33+
// don't drop groupings from a local plan, as the layout has already been agreed upon
34+
private final boolean local;
3035

31-
public CombineProjections() {
36+
public CombineProjections(boolean local) {
3237
super(OptimizerRules.TransformDirection.UP);
38+
this.local = local;
3339
}
3440

3541
@Override
@@ -60,27 +66,91 @@ protected LogicalPlan rule(UnaryPlan plan) {
6066
return plan;
6167
}
6268

63-
// Agg with underlying Project (group by on sub-queries)
64-
if (plan instanceof Aggregate a) {
65-
if (child instanceof Project p) {
66-
var groupings = a.groupings();
67-
List<NamedExpression> groupingAttrs = new ArrayList<>(a.groupings().size());
68-
for (Expression grouping : groupings) {
69-
if (grouping instanceof Attribute attribute) {
70-
groupingAttrs.add(attribute);
71-
} else if (grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) {
72-
groupingAttrs.add(as);
69+
if (plan instanceof Aggregate a && child instanceof Project p) {
70+
var groupings = a.groupings();
71+
72+
// sanity checks
73+
for (Expression grouping : groupings) {
74+
if ((grouping instanceof Attribute
75+
|| grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) == false) {
76+
// After applying ReplaceAggregateNestedExpressionWithEval,
77+
// evaluatable groupings can only contain attributes.
78+
throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping);
79+
}
80+
}
81+
assert groupings.size() <= 1
82+
|| groupings.stream()
83+
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
84+
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
85+
86+
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
87+
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
88+
for (NamedExpression ne : p.projections()) {
89+
// Record the aliases.
90+
// Projections are just aliases for attributes, so casting is safe.
91+
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
92+
}
93+
var aliases = aliasesBuilder.build();
94+
95+
// Propagate any renames from the lower projection into the upper groupings.
96+
List<Expression> resolvedGroupings = new ArrayList<>();
97+
for (Expression grouping : groupings) {
98+
Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as));
99+
resolvedGroupings.add(transformed);
100+
}
101+
102+
// This can lead to duplicates in the groupings: e.g.
103+
// | EVAL x = y | STATS ... BY x, y
104+
if (local) {
105+
// On the data node, the groupings must be preserved because they affect the physical output (see
106+
// AbstractPhysicalOperationProviders#intermediateAttributes).
107+
// In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place
108+
// of the original projection to create new attributes for the duplicate groups.
109+
Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size());
110+
List<Expression> newGroupings = new ArrayList<>();
111+
List<Alias> aliasesAgainstDuplication = new ArrayList<>();
112+
113+
for (int i = 0; i < groupings.size(); i++) {
114+
Expression resolvedGrouping = resolvedGroupings.get(i);
115+
if (seenResolvedGroupings.add(resolvedGrouping)) {
116+
newGroupings.add(resolvedGrouping);
73117
} else {
74-
// After applying ReplaceAggregateNestedExpressionWithEval,
75-
// evaluatable groupings can only contain attributes.
76-
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
118+
// resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to.
119+
// should really only be 1 attribute, anyway, but going via .references() includes the case of a
120+
// GroupingFunction.NonEvaluatableGroupingFunction.
121+
Attribute coreAttribute = resolvedGrouping.references().iterator().next();
122+
123+
Alias renameAgainstDuplication = new Alias(
124+
coreAttribute.source(),
125+
TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name()),
126+
coreAttribute
127+
);
128+
aliasesAgainstDuplication.add(renameAgainstDuplication);
129+
130+
// propagate the new alias into the new grouping
131+
AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder();
132+
resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute());
133+
AttributeMap<Attribute> resolver = resolverBuilder.build();
134+
135+
newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr)));
77136
}
78137
}
79-
plan = a.with(
80-
p.child(),
81-
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
82-
combineProjections(a.aggregates(), p.projections())
83-
);
138+
139+
LogicalPlan newChild = aliasesAgainstDuplication.isEmpty()
140+
? p.child()
141+
: new Eval(p.source(), p.child(), aliasesAgainstDuplication);
142+
plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections()));
143+
} else {
144+
// On the coordinator, we can just discard the duplicates.
145+
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which
146+
// will be an alias like `c = CATEGORIZE(attribute)`.
147+
// Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on
148+
// regular equality (i.e. based on names) instead of name ids.
149+
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g.
150+
// for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also
151+
// applies in the local case below.
152+
List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings));
153+
plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections()));
84154
}
85155
}
86156

@@ -143,39 +213,6 @@ private static List<NamedExpression> combineProjections(List<? extends NamedExpr
143213
return replaced;
144214
}
145215

146-
private static List<Expression> combineUpperGroupingsAndLowerProjections(
147-
List<? extends NamedExpression> upperGroupings,
148-
List<? extends NamedExpression> lowerProjections
149-
) {
150-
assert upperGroupings.size() <= 1
151-
|| upperGroupings.stream()
152-
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
153-
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
154-
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
155-
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
156-
for (NamedExpression ne : lowerProjections) {
157-
// Record the aliases.
158-
// Projections are just aliases for attributes, so casting is safe.
159-
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
160-
}
161-
var aliases = aliasesBuilder.build();
162-
163-
// Propagate any renames from the lower projection into the upper groupings.
164-
// This can lead to duplicates: e.g.
165-
// | EVAL x = y | STATS ... BY x, y
166-
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which will be
167-
// an alias like `c = CATEGORIZE(attribute)`.
168-
// Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet).
169-
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. for
170-
// `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead.
171-
LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>();
172-
for (NamedExpression ne : upperGroupings) {
173-
NamedExpression transformed = (NamedExpression) ne.transformUp(Attribute.class, a -> aliases.resolve(a, a));
174-
resolvedGroupings.add(transformed);
175-
}
176-
return new ArrayList<>(resolvedGroupings);
177-
}
178-
179216
/**
180217
* Replace grouping alias previously contained in the aggregations that might have been projected away.
181218
*/

0 commit comments

Comments
 (0)