|
17 | 17 | import org.elasticsearch.xpack.esql.core.expression.NamedExpression; |
18 | 18 | import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize; |
19 | 19 | import org.elasticsearch.xpack.esql.plan.logical.Aggregate; |
| 20 | +import org.elasticsearch.xpack.esql.plan.logical.Eval; |
20 | 21 | import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; |
21 | 22 | import org.elasticsearch.xpack.esql.plan.logical.Project; |
22 | 23 | import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; |
23 | 24 |
|
24 | 25 | import java.util.ArrayList; |
| 26 | +import java.util.HashSet; |
25 | 27 | import java.util.LinkedHashSet; |
26 | 28 | import java.util.List; |
| 29 | +import java.util.Set; |
27 | 30 |
|
28 | 31 | public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> { |
| 32 | + // don't drop groupings from a local plan, as the layout has already been agreed upon |
| 33 | + private final boolean local; |
29 | 34 |
|
30 | | - public CombineProjections() { |
| 35 | + public CombineProjections(boolean local) { |
31 | 36 | super(OptimizerRules.TransformDirection.UP); |
| 37 | + this.local = local; |
32 | 38 | } |
33 | 39 |
|
34 | 40 | @Override |
@@ -59,29 +65,89 @@ protected LogicalPlan rule(UnaryPlan plan) { |
59 | 65 | return plan; |
60 | 66 | } |
61 | 67 |
|
62 | | - // Agg with underlying Project (group by on sub-queries) |
63 | | - if (plan instanceof Aggregate a) { |
64 | | - if (child instanceof Project p) { |
65 | | - var groupings = a.groupings(); |
66 | | - List<NamedExpression> groupingAttrs = new ArrayList<>(a.groupings().size()); |
67 | | - for (Expression grouping : groupings) { |
68 | | - if (grouping instanceof Attribute attribute) { |
69 | | - groupingAttrs.add(attribute); |
70 | | - } else if (grouping instanceof Alias as && as.child() instanceof Categorize) { |
71 | | - groupingAttrs.add(as); |
| 68 | + if (plan instanceof Aggregate a && child instanceof Project p) { |
| 69 | + var groupings = a.groupings(); |
| 70 | + |
| 71 | + // sanity checks |
| 72 | + for (Expression grouping : groupings) { |
| 73 | + if ((grouping instanceof Attribute || grouping instanceof Alias as && as.child() instanceof Categorize) == false) { |
| 74 | + // After applying ReplaceAggregateNestedExpressionWithEval, |
| 75 | + // evaluatable groupings can only contain attributes. |
| 76 | + throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping); |
| 77 | + } |
| 78 | + } |
| 79 | + assert groupings.size() <= 1 |
| 80 | + || groupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false |
| 81 | + : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; |
| 82 | + |
| 83 | + // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) |
| 84 | + AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder(); |
| 85 | + for (NamedExpression ne : p.projections()) { |
| 86 | + // Record the aliases. |
| 87 | + // Projections are just aliases for attributes, so casting is safe. |
| 88 | + aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); |
| 89 | + } |
| 90 | + var aliases = aliasesBuilder.build(); |
| 91 | + |
| 92 | + // Propagate any renames from the lower projection into the upper groupings. |
| 93 | + List<Expression> resolvedGroupings = new ArrayList<>(); |
| 94 | + for (Expression grouping : groupings) { |
| 95 | + Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as)); |
| 96 | + resolvedGroupings.add(transformed); |
| 97 | + } |
| 98 | + |
| 99 | + // This can lead to duplicates in the groupings: e.g. |
| 100 | + // | EVAL x = y | STATS ... BY x, y |
| 101 | + if (local) { |
| 102 | + // On the data node, the groupings must be preserved because they affect the physical output (see |
| 103 | + // AbstractPhysicalOperationProviders#intermediateAttributes). |
| 104 | + // In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place |
| 105 | + // of the original projection to create new attributes for the duplicate groups. |
| 106 | + Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size()); |
| 107 | + List<Expression> newGroupings = new ArrayList<>(); |
| 108 | + List<Alias> aliasesAgainstDuplication = new ArrayList<>(); |
| 109 | + |
| 110 | + for (int i = 0; i < groupings.size(); i++) { |
| 111 | + Expression resolvedGrouping = resolvedGroupings.get(i); |
| 112 | + if (seenResolvedGroupings.add(resolvedGrouping)) { |
| 113 | + newGroupings.add(resolvedGrouping); |
72 | 114 | } else { |
73 | | - // After applying ReplaceAggregateNestedExpressionWithEval, |
74 | | - // groupings (except Categorize) can only contain attributes. |
75 | | - throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping); |
| 115 | + // resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to. |
| 116 | + // should really only be 1 attribute, anyway, but going via .references() includes the case of a |
| 117 | + // GroupingFunction.NonEvaluatableGroupingFunction. |
| 118 | + Attribute coreAttribute = resolvedGrouping.references().iterator().next(); |
| 119 | + |
| 120 | + Alias renameAgainstDuplication = new Alias( |
| 121 | + coreAttribute.source(), |
| 122 | + TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name(), "temp_name"), |
| 123 | + coreAttribute |
| 124 | + ); |
| 125 | + aliasesAgainstDuplication.add(renameAgainstDuplication); |
| 126 | + |
| 127 | + // propagate the new alias into the new grouping |
| 128 | + AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder(); |
| 129 | + resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute()); |
| 130 | + AttributeMap<Attribute> resolver = resolverBuilder.build(); |
| 131 | + |
| 132 | + newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr))); |
76 | 133 | } |
77 | 134 | } |
78 | | - plan = new Aggregate( |
79 | | - a.source(), |
80 | | - p.child(), |
81 | | - a.aggregateType(), |
82 | | - combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()), |
83 | | - combineProjections(a.aggregates(), p.projections()) |
84 | | - ); |
| 135 | + |
| 136 | + LogicalPlan newChild = aliasesAgainstDuplication.isEmpty() |
| 137 | + ? p.child() |
| 138 | + : new Eval(p.source(), p.child(), aliasesAgainstDuplication); |
| 139 | + plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections())); |
| 140 | + } else { |
| 141 | + // On the coordinator, we can just discard the duplicates. |
| 142 | + // All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which |
| 143 | + // will be an alias like `c = CATEGORIZE(attribute)`. |
| 144 | + // Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on |
| 145 | + // regular equality (i.e. based on names) instead of name ids. |
| 146 | + // TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. |
| 147 | + // for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also |
| 148 | + // applies in the local case below. |
| 149 | + List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings)); |
| 150 | + plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections())); |
85 | 151 | } |
86 | 152 | } |
87 | 153 |
|
@@ -141,37 +207,6 @@ private static List<NamedExpression> combineProjections(List<? extends NamedExpr |
141 | 207 | return replaced; |
142 | 208 | } |
143 | 209 |
|
144 | | - private static List<Expression> combineUpperGroupingsAndLowerProjections( |
145 | | - List<? extends NamedExpression> upperGroupings, |
146 | | - List<? extends NamedExpression> lowerProjections |
147 | | - ) { |
148 | | - assert upperGroupings.size() <= 1 |
149 | | - || upperGroupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false |
150 | | - : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; |
151 | | - // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) |
152 | | - AttributeMap<Attribute> aliases = new AttributeMap<>(); |
153 | | - for (NamedExpression ne : lowerProjections) { |
154 | | - // Record the aliases. |
155 | | - // Projections are just aliases for attributes, so casting is safe. |
156 | | - aliases.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); |
157 | | - } |
158 | | - |
159 | | - // Propagate any renames from the lower projection into the upper groupings. |
160 | | - // This can lead to duplicates: e.g. |
161 | | - // | EVAL x = y | STATS ... BY x, y |
162 | | - // All substitutions happen before; groupings must be attributes at this point except for CATEGORIZE which will be an alias like |
163 | | - // `c = CATEGORIZE(attribute)`. |
164 | | - // Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet). |
165 | | - // TODO: The deduplication based on simple equality will be insufficient in case of multiple CATEGORIZEs, e.g. for |
166 | | - // `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. |
167 | | - LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>(); |
168 | | - for (NamedExpression ne : upperGroupings) { |
169 | | - NamedExpression transformed = (NamedExpression) ne.transformUp(Attribute.class, a -> aliases.resolve(a, a)); |
170 | | - resolvedGroupings.add(transformed); |
171 | | - } |
172 | | - return new ArrayList<>(resolvedGroupings); |
173 | | - } |
174 | | - |
175 | 210 | /** |
176 | 211 | * Replace grouping alias previously contained in the aggregations that might have been projected away. |
177 | 212 | */ |
|
0 commit comments