Skip to content

Commit 37388a1

Browse files
authored
implement MergeUnion rule to flatten nested UnionNode (apache#16657)
1 parent 57fe573 commit 37388a1

File tree

4 files changed

+304
-1
lines changed

4 files changed

+304
-1
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
21+
22+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
23+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
24+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
25+
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
26+
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
27+
28+
import java.util.Optional;
29+
30+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.union;
31+
32+
public class MergeUnion implements Rule<UnionNode> {
33+
34+
private final Pattern<UnionNode> pattern = union();
35+
36+
@Override
37+
public Pattern<UnionNode> getPattern() {
38+
return pattern;
39+
}
40+
41+
@Override
42+
public Result apply(UnionNode node, Captures captures, Context context) {
43+
44+
SetOperationMerge mergeOperation = new SetOperationMerge(node, context);
45+
Optional<SetOperationNode> result = mergeOperation.merge();
46+
return result.map(Result::ofPlanNode).orElseGet(Result::empty);
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
21+
22+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
23+
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
24+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
25+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
26+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
27+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
28+
29+
import com.google.common.collect.ImmutableListMultimap;
30+
import com.google.common.collect.Iterables;
31+
32+
import java.util.ArrayList;
33+
import java.util.Collection;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Optional;
37+
38+
import static com.google.common.base.Preconditions.checkState;
39+
import static com.google.common.collect.ImmutableList.toImmutableList;
40+
41+
public class SetOperationMerge {
42+
43+
private final Rule.Context context;
44+
private final SetOperationNode node;
45+
private final List<PlanNode> newSources;
46+
47+
public SetOperationMerge(SetOperationNode node, Rule.Context context) {
48+
this.node = node;
49+
this.context = context;
50+
this.newSources = new ArrayList<>();
51+
}
52+
53+
// Merge multiple union into one union
54+
public Optional<SetOperationNode> merge() {
55+
56+
checkState(
57+
node instanceof UnionNode, "unexpected node type: %s", node.getClass().getSimpleName());
58+
Lookup lookup = context.getLookup();
59+
// Pre-check
60+
boolean anyMerge =
61+
node.getChildren().stream()
62+
.map(lookup::resolve)
63+
.anyMatch(child -> node.getClass().equals(child.getClass()));
64+
if (!anyMerge) {
65+
return Optional.empty();
66+
}
67+
68+
List<PlanNode> childrenOfUnion =
69+
node.getChildren().stream().map(lookup::resolve).collect(toImmutableList());
70+
71+
ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder =
72+
ImmutableListMultimap.builder();
73+
74+
boolean rewritten = false;
75+
76+
for (int i = 0; i < childrenOfUnion.size(); i++) {
77+
PlanNode child = childrenOfUnion.get(i);
78+
79+
// Determine if set operations can be merged and whether the resulting set operation is
80+
// quantified DISTINCT or ALL
81+
Optional<Boolean> mergedQuantifier = mergedQuantifierIsDistinct(node, child);
82+
if (mergedQuantifier.isPresent()) {
83+
addMergedMappings((SetOperationNode) child, i, newMappingsBuilder);
84+
rewritten = true;
85+
} else {
86+
// Keep mapping as it is
87+
addOriginalMappings(child, i, newMappingsBuilder);
88+
}
89+
}
90+
91+
if (!rewritten) {
92+
return Optional.empty();
93+
}
94+
95+
// the union has merged
96+
return Optional.of(
97+
new UnionNode(
98+
node.getPlanNodeId(), newSources, newMappingsBuilder.build(), node.getOutputSymbols()));
99+
}
100+
101+
private void addMergedMappings(
102+
SetOperationNode child,
103+
int childIndex,
104+
ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder) {
105+
106+
newSources.addAll(child.getChildren());
107+
Map<Symbol, Collection<Symbol>> symbolMappings = node.getSymbolMapping().asMap();
108+
for (Map.Entry<Symbol, Collection<Symbol>> mapping : symbolMappings.entrySet()) {
109+
Symbol input = Iterables.get(mapping.getValue(), childIndex);
110+
newMappingsBuilder.putAll(mapping.getKey(), child.getSymbolMapping().get(input));
111+
}
112+
}
113+
114+
private void addOriginalMappings(
115+
PlanNode child,
116+
int childIndex,
117+
ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder) {
118+
119+
newSources.add(child);
120+
Map<Symbol, Collection<Symbol>> symbolMappings = node.getSymbolMapping().asMap();
121+
for (Map.Entry<Symbol, Collection<Symbol>> mapping : symbolMappings.entrySet()) {
122+
newMappingsBuilder.put(mapping.getKey(), Iterables.get(mapping.getValue(), childIndex));
123+
}
124+
}
125+
126+
/**
127+
* Check if node and child are mergeable based on their set operation type and quantifier.
128+
*
129+
* <p>Optional.empty() indicates that merge is not possible.
130+
*/
131+
private Optional<Boolean> mergedQuantifierIsDistinct(SetOperationNode node, PlanNode child) {
132+
133+
if (!node.getClass().equals(child.getClass())) {
134+
return Optional.empty();
135+
}
136+
137+
if (node instanceof UnionNode) {
138+
return Optional.of(false);
139+
}
140+
141+
// the Judgment logic for intersect and except wait for supplying
142+
return Optional.empty();
143+
}
144+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithSort;
3434
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithSort;
3535
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimits;
36+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeUnion;
3637
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MultipleDistinctAggregationToMarkDistinct;
3738
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.OptimizeRowPattern;
3839
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationColumns;
@@ -207,6 +208,7 @@ public LogicalOptimizeFactory(PlannerContext plannerContext) {
207208
new MergeFilters(),
208209
new InlineProjections(plannerContext),
209210
new RemoveRedundantIdentityProjections(),
211+
new MergeUnion(),
210212
new MergeLimits(),
211213
new RemoveTrivialFilters(),
212214
// new RemoveRedundantLimit(),
@@ -245,7 +247,7 @@ public LogicalOptimizeFactory(PlannerContext plannerContext) {
245247
.addAll(limitPushdownRules)
246248
.addAll(
247249
ImmutableSet.of(
248-
// new MergeUnion(),
250+
new MergeUnion(),
249251
// new RemoveEmptyUnionBranches(),
250252
new MergeFilters(),
251253
new RemoveTrivialFilters(),
@@ -255,6 +257,17 @@ public LogicalOptimizeFactory(PlannerContext plannerContext) {
255257
.build()),
256258
simplifyOptimizer,
257259
new UnaliasSymbolReferences(plannerContext.getMetadata()),
260+
new IterativeOptimizer(
261+
plannerContext,
262+
ruleStats,
263+
ImmutableSet.<Rule<?>>builder()
264+
.addAll(
265+
ImmutableSet.of(
266+
new MergeUnion(),
267+
// new MergeIntersect
268+
// new MergeExcept
269+
new PruneDistinctAggregation()))
270+
.build()),
258271
columnPruningOptimizer,
259272
inlineProjectionLimitFiltersOptimizer,
260273
new IterativeOptimizer(
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
21+
22+
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
23+
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
24+
import org.apache.iotdb.db.queryengine.plan.relational.planner.TableLogicalPlanner;
25+
26+
import org.junit.Test;
27+
28+
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
29+
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.DEFAULT_WARNING;
30+
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.QUERY_CONTEXT;
31+
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.SESSION_INFO;
32+
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.TEST_MATADATA;
33+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
34+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
35+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
36+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.union;
37+
38+
public class MergeUnionTest {
39+
40+
@Test
41+
public void simpleLeftDeepMerge() {
42+
43+
String sql =
44+
"(select tag1 from t1 union all select tag1 from t2) union all select tag1 from t3 ";
45+
Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
46+
SymbolAllocator symbolAllocator = new SymbolAllocator();
47+
LogicalQueryPlan actualLogicalQueryPlan =
48+
new TableLogicalPlanner(
49+
QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator, DEFAULT_WARNING)
50+
.plan(analysis);
51+
52+
// just verify the Logical plan `Output - union - 3*tableScan`
53+
assertPlan(
54+
actualLogicalQueryPlan,
55+
output((union(tableScan("testdb.t1"), tableScan("testdb.t2"), tableScan("testdb.t3")))));
56+
}
57+
58+
@Test
59+
public void simpleRightDeepMerge() {
60+
61+
String sql =
62+
"select tag1 from t1 union all (select tag1 from t2 union all select tag1 from t3) ";
63+
Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
64+
SymbolAllocator symbolAllocator = new SymbolAllocator();
65+
LogicalQueryPlan actualLogicalQueryPlan =
66+
new TableLogicalPlanner(
67+
QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator, DEFAULT_WARNING)
68+
.plan(analysis);
69+
70+
// just verify the Logical plan `Output - union - 3*tableScan`
71+
assertPlan(
72+
actualLogicalQueryPlan,
73+
output((union(tableScan("testdb.t1"), tableScan("testdb.t2"), tableScan("testdb.t3")))));
74+
}
75+
76+
@Test
77+
public void bushyTreeMerge() {
78+
79+
String sql =
80+
"(select tag1 from t1 union all select tag1 from t2) union all (select tag1 from t3 union all select tag1 from t4) ";
81+
Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
82+
SymbolAllocator symbolAllocator = new SymbolAllocator();
83+
LogicalQueryPlan actualLogicalQueryPlan =
84+
new TableLogicalPlanner(
85+
QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator, DEFAULT_WARNING)
86+
.plan(analysis);
87+
88+
// just verify the Logical plan `Output - union - 4*tableScan`
89+
assertPlan(
90+
actualLogicalQueryPlan,
91+
output(
92+
(union(
93+
tableScan("testdb.t1"),
94+
tableScan("testdb.t2"),
95+
tableScan("testdb.t3"),
96+
tableScan("testdb.t4")))));
97+
}
98+
}

0 commit comments

Comments
 (0)