|
15 | 15 | import org.elasticsearch.xpack.esql.core.expression.Expression;
|
16 | 16 | import org.elasticsearch.xpack.esql.core.expression.Expressions;
|
17 | 17 | import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
|
| 18 | +import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize; |
18 | 19 | import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
|
| 20 | +import org.elasticsearch.xpack.esql.plan.logical.Eval; |
19 | 21 | import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
20 | 22 | import org.elasticsearch.xpack.esql.plan.logical.Project;
|
21 | 23 | import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
|
22 | 24 |
|
23 | 25 | import java.util.ArrayList;
|
| 26 | +import java.util.HashSet; |
| 27 | +import java.util.LinkedHashSet; |
24 | 28 | import java.util.List;
|
| 29 | +import java.util.Set; |
25 | 30 |
|
26 | 31 | public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> {
|
| 32 | + // don't drop groupings from a local plan, as the layout has already been agreed upon |
| 33 | + private final boolean local; |
27 | 34 |
|
28 |
| - public CombineProjections() { |
| 35 | + public CombineProjections(boolean local) { |
29 | 36 | super(OptimizerRules.TransformDirection.UP);
|
| 37 | + this.local = local; |
30 | 38 | }
|
31 | 39 |
|
32 | 40 | @Override
|
@@ -57,26 +65,89 @@ protected LogicalPlan rule(UnaryPlan plan) {
|
57 | 65 | return plan;
|
58 | 66 | }
|
59 | 67 |
|
60 |
| - // Agg with underlying Project (group by on sub-queries) |
61 |
| - if (plan instanceof Aggregate a) { |
62 |
| - if (child instanceof Project p) { |
63 |
| - var groupings = a.groupings(); |
64 |
| - List<Attribute> groupingAttrs = new ArrayList<>(a.groupings().size()); |
65 |
| - for (Expression grouping : groupings) { |
66 |
| - if (grouping instanceof Attribute attribute) { |
67 |
| - groupingAttrs.add(attribute); |
| 68 | + if (plan instanceof Aggregate a && child instanceof Project p) { |
| 69 | + var groupings = a.groupings(); |
| 70 | + |
| 71 | + // sanity checks |
| 72 | + for (Expression grouping : groupings) { |
| 73 | + if ((grouping instanceof Attribute || grouping instanceof Alias as && as.child() instanceof Categorize) == false) { |
| 74 | + // After applying ReplaceAggregateNestedExpressionWithEval, |
| 75 | + // evaluatable groupings can only contain attributes. |
| 76 | + throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping); |
| 77 | + } |
| 78 | + } |
| 79 | + assert groupings.size() <= 1 |
| 80 | + || groupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false |
| 81 | + : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; |
| 82 | + |
| 83 | + // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) |
| 84 | + AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder(); |
| 85 | + for (NamedExpression ne : p.projections()) { |
| 86 | + // Record the aliases. |
| 87 | + // Projections are just aliases for attributes, so casting is safe. |
| 88 | + aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); |
| 89 | + } |
| 90 | + var aliases = aliasesBuilder.build(); |
| 91 | + |
| 92 | + // Propagate any renames from the lower projection into the upper groupings. |
| 93 | + List<Expression> resolvedGroupings = new ArrayList<>(); |
| 94 | + for (Expression grouping : groupings) { |
| 95 | + Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as)); |
| 96 | + resolvedGroupings.add(transformed); |
| 97 | + } |
| 98 | + |
| 99 | + // This can lead to duplicates in the groupings: e.g. |
| 100 | + // | EVAL x = y | STATS ... BY x, y |
| 101 | + if (local) { |
| 102 | + // On the data node, the groupings must be preserved because they affect the physical output (see |
| 103 | + // AbstractPhysicalOperationProviders#intermediateAttributes). |
| 104 | + // In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place |
| 105 | + // of the original projection to create new attributes for the duplicate groups. |
| 106 | + Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size()); |
| 107 | + List<Expression> newGroupings = new ArrayList<>(); |
| 108 | + List<Alias> aliasesAgainstDuplication = new ArrayList<>(); |
| 109 | + |
| 110 | + for (int i = 0; i < groupings.size(); i++) { |
| 111 | + Expression resolvedGrouping = resolvedGroupings.get(i); |
| 112 | + if (seenResolvedGroupings.add(resolvedGrouping)) { |
| 113 | + newGroupings.add(resolvedGrouping); |
68 | 114 | } else {
|
69 |
| - // After applying ReplaceAggregateNestedExpressionWithEval, groupings can only contain attributes. |
70 |
| - throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping); |
| 115 | + // resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to. |
| 116 | + // should really only be 1 attribute, anyway, but going via .references() includes the case of a |
| 117 | + // GroupingFunction.NonEvaluatableGroupingFunction. |
| 118 | + Attribute coreAttribute = resolvedGrouping.references().iterator().next(); |
| 119 | + |
| 120 | + Alias renameAgainstDuplication = new Alias( |
| 121 | + coreAttribute.source(), |
| 122 | + TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name(), "temp_name"), |
| 123 | + coreAttribute |
| 124 | + ); |
| 125 | + aliasesAgainstDuplication.add(renameAgainstDuplication); |
| 126 | + |
| 127 | + // propagate the new alias into the new grouping |
| 128 | + AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder(); |
| 129 | + resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute()); |
| 130 | + AttributeMap<Attribute> resolver = resolverBuilder.build(); |
| 131 | + |
| 132 | + newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr))); |
71 | 133 | }
|
72 | 134 | }
|
73 |
| - plan = new Aggregate( |
74 |
| - a.source(), |
75 |
| - p.child(), |
76 |
| - a.aggregateType(), |
77 |
| - combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()), |
78 |
| - combineProjections(a.aggregates(), p.projections()) |
79 |
| - ); |
| 135 | + |
| 136 | + LogicalPlan newChild = aliasesAgainstDuplication.isEmpty() |
| 137 | + ? p.child() |
| 138 | + : new Eval(p.source(), p.child(), aliasesAgainstDuplication); |
| 139 | + plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections())); |
| 140 | + } else { |
| 141 | + // On the coordinator, we can just discard the duplicates. |
| 142 | + // All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which |
| 143 | + // will be an alias like `c = CATEGORIZE(attribute)`. |
| 144 | + // Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on |
| 145 | + // regular equality (i.e. based on names) instead of name ids. |
| 146 | + // TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. |
| 147 | + // for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also |
| 148 | + // applies in the local case below. |
| 149 | + List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings)); |
| 150 | + plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections())); |
80 | 151 | }
|
81 | 152 | }
|
82 | 153 |
|
@@ -136,26 +207,6 @@ private static List<NamedExpression> combineProjections(List<? extends NamedExpr
|
136 | 207 | return replaced;
|
137 | 208 | }
|
138 | 209 |
|
139 |
| - private static List<Expression> combineUpperGroupingsAndLowerProjections( |
140 |
| - List<? extends Attribute> upperGroupings, |
141 |
| - List<? extends NamedExpression> lowerProjections |
142 |
| - ) { |
143 |
| - // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) |
144 |
| - AttributeMap<Attribute> aliases = new AttributeMap<>(); |
145 |
| - for (NamedExpression ne : lowerProjections) { |
146 |
| - // Projections are just aliases for attributes, so casting is safe. |
147 |
| - aliases.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); |
148 |
| - } |
149 |
| - |
150 |
| - // Replace any matching attribute directly with the aliased attribute from the projection. |
151 |
| - AttributeSet replaced = new AttributeSet(); |
152 |
| - for (Attribute attr : upperGroupings) { |
153 |
| - // All substitutions happen before; groupings must be attributes at this point. |
154 |
| - replaced.add(aliases.resolve(attr, attr)); |
155 |
| - } |
156 |
| - return new ArrayList<>(replaced); |
157 |
| - } |
158 |
| - |
159 | 210 | /**
|
160 | 211 | * Replace grouping alias previously contained in the aggregations that might have been projected away.
|
161 | 212 | */
|
|
0 commit comments