Skip to content

Commit 618e601

Browse files
silundongzabetak
authored andcommitted
[CALCITE-6846] Support basic DPhyp join reorder algorithm
Close apache#4204
1 parent 29452b3 commit 618e601

File tree

9 files changed

+1366
-0
lines changed

9 files changed

+1366
-0
lines changed

core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.calcite.rel.rules;
1818

19+
import org.apache.calcite.linq4j.function.Experimental;
1920
import org.apache.calcite.rel.RelNode;
2021
import org.apache.calcite.rel.core.Aggregate;
2122
import org.apache.calcite.rel.core.Calc;
@@ -816,4 +817,17 @@ private CoreRules() {}
816817
WINDOW_REDUCE_EXPRESSIONS =
817818
ReduceExpressionsRule.WindowReduceExpressionsRule.WindowReduceExpressionsRuleConfig
818819
.DEFAULT.toRule();
820+
821+
/** Rule that flattens a tree of {@link LogicalJoin}s
822+
* into a single {@link HyperGraph} with N inputs. */
823+
@Experimental
824+
public static final JoinToHyperGraphRule JOIN_TO_HYPER_GRAPH =
825+
JoinToHyperGraphRule.Config.DEFAULT.toRule();
826+
827+
/** Rule that re-orders a {@link Join} tree using dphyp algorithm.
828+
*
829+
* @see #JOIN_TO_HYPER_GRAPH */
830+
@Experimental
831+
public static final DphypJoinReorderRule HYPER_GRAPH_OPTIMIZE =
832+
DphypJoinReorderRule.Config.DEFAULT.toRule();
819833
}
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.calcite.rel.rules;
18+
19+
import org.apache.calcite.linq4j.function.Experimental;
20+
import org.apache.calcite.plan.RelOptCost;
21+
import org.apache.calcite.rel.RelNode;
22+
import org.apache.calcite.rel.core.JoinRelType;
23+
import org.apache.calcite.rel.metadata.RelMetadataQuery;
24+
import org.apache.calcite.rex.RexNode;
25+
import org.apache.calcite.tools.RelBuilder;
26+
27+
import org.checkerframework.checker.nullness.qual.Nullable;
28+
29+
import java.util.HashMap;
30+
import java.util.List;
31+
32+
/**
33+
* The core process of dphyp enumeration algorithm.
34+
*/
35+
@Experimental
36+
public class DpHyp {
37+
38+
private final HyperGraph hyperGraph;
39+
40+
private final HashMap<Long, RelNode> dpTable;
41+
42+
private final RelBuilder builder;
43+
44+
private final RelMetadataQuery mq;
45+
46+
public DpHyp(HyperGraph hyperGraph, RelBuilder builder, RelMetadataQuery relMetadataQuery) {
47+
this.hyperGraph =
48+
hyperGraph.copy(
49+
hyperGraph.getTraitSet(),
50+
hyperGraph.getInputs());
51+
this.dpTable = new HashMap<>();
52+
this.builder = builder;
53+
this.mq = relMetadataQuery;
54+
// make all field name unique and convert the
55+
// HyperEdge condition from RexInputRef to RexInputFieldName
56+
this.hyperGraph.convertHyperEdgeCond(builder);
57+
}
58+
59+
/**
60+
* The entry function of the algorithm. We use a bitmap to represent a leaf node,
61+
* which indicates the position of the corresponding leaf node in {@link HyperGraph}.
62+
*
63+
* <p>After the enumeration is completed, the best join order will be stored
64+
* in the {@link DpHyp#dpTable}.
65+
*/
66+
public void startEnumerateJoin() {
67+
int size = hyperGraph.getInputs().size();
68+
for (int i = 0; i < size; i++) {
69+
long singleNode = LongBitmap.newBitmap(i);
70+
dpTable.put(singleNode, hyperGraph.getInput(i));
71+
hyperGraph.initEdgeBitMap(singleNode);
72+
}
73+
74+
// start enumerating from the second to last
75+
for (int i = size - 2; i >= 0; i--) {
76+
long csg = LongBitmap.newBitmap(i);
77+
long forbidden = csg - 1;
78+
emitCsg(csg);
79+
enumerateCsgRec(csg, forbidden);
80+
}
81+
}
82+
83+
/**
84+
* Given a connected subgraph (csg), enumerate all possible complements subgraph (cmp)
85+
* that do not include anything from the exclusion subset.
86+
*
87+
* <p>Corresponding to EmitCsg in origin paper.
88+
*/
89+
private void emitCsg(long csg) {
90+
long forbidden = csg | LongBitmap.getBvBitmap(csg);
91+
long neighbors = hyperGraph.getNeighborBitmap(csg, forbidden);
92+
93+
LongBitmap.ReverseIterator reverseIterator = new LongBitmap.ReverseIterator(neighbors);
94+
for (long cmp : reverseIterator) {
95+
List<HyperEdge> edges = hyperGraph.connectCsgCmp(csg, cmp);
96+
if (!edges.isEmpty()) {
97+
emitCsgCmp(csg, cmp, edges);
98+
}
99+
// forbidden the nodes that smaller than current cmp when extend cmp, e.g.
100+
// neighbors = {t1, t2}, t1 and t2 are connected.
101+
// when extented t2, we will get (t1, t2)
102+
// when extented t1, we will get (t1, t2) repeated
103+
long newForbidden =
104+
(cmp | LongBitmap.getBvBitmap(cmp)) & neighbors;
105+
newForbidden = newForbidden | forbidden;
106+
enumerateCmpRec(csg, cmp, newForbidden);
107+
}
108+
}
109+
110+
/**
111+
* Given a connected subgraph (csg), expands it recursively by its neighbors.
112+
* If the expanded csg is connected, try to enumerate its cmp (note that for complex hyperedge,
113+
* we only select a single representative node to add to the neighbors, so csg and subNeighbor
114+
* are not necessarily connected. However, it still needs to be expanded to prevent missing
115+
* complex hyperedge). This method is called after the enumeration of csg is completed,
116+
* that is, after {@link DpHyp#emitCsg(long csg)}.
117+
*
118+
* <p>Corresponding to EnumerateCsgRec in origin paper.
119+
*/
120+
private void enumerateCsgRec(long csg, long forbidden) {
121+
long neighbors = hyperGraph.getNeighborBitmap(csg, forbidden);
122+
LongBitmap.SubsetIterator subsetIterator = new LongBitmap.SubsetIterator(neighbors);
123+
for (long subNeighbor : subsetIterator) {
124+
hyperGraph.updateEdgesForUnion(csg, subNeighbor);
125+
long newCsg = csg | subNeighbor;
126+
if (dpTable.containsKey(newCsg)) {
127+
emitCsg(newCsg);
128+
}
129+
}
130+
long newForbidden = forbidden | neighbors;
131+
subsetIterator.reset();
132+
for (long subNeighbor : subsetIterator) {
133+
long newCsg = csg | subNeighbor;
134+
enumerateCsgRec(newCsg, newForbidden);
135+
}
136+
}
137+
138+
/**
139+
* Given a connected subgraph (csg) and its complement subgraph (cmp), expands the cmp
140+
* recursively by neighbors of cmp (cmp and subNeighbor are not necessarily connected,
141+
* which is the same logic as in {@link DpHyp#enumerateCsgRec}).
142+
*
143+
* <p>Corresponding to EnumerateCmpRec in origin paper.
144+
*/
145+
private void enumerateCmpRec(long csg, long cmp, long forbidden) {
146+
long neighbors = hyperGraph.getNeighborBitmap(cmp, forbidden);
147+
LongBitmap.SubsetIterator subsetIterator = new LongBitmap.SubsetIterator(neighbors);
148+
for (long subNeighbor : subsetIterator) {
149+
long newCmp = cmp | subNeighbor;
150+
hyperGraph.updateEdgesForUnion(cmp, subNeighbor);
151+
if (dpTable.containsKey(newCmp)) {
152+
List<HyperEdge> edges = hyperGraph.connectCsgCmp(csg, newCmp);
153+
if (!edges.isEmpty()) {
154+
emitCsgCmp(csg, newCmp, edges);
155+
}
156+
}
157+
}
158+
long newForbidden = forbidden | neighbors;
159+
subsetIterator.reset();
160+
for (long subNeighbor : subsetIterator) {
161+
long newCmp = cmp | subNeighbor;
162+
enumerateCmpRec(csg, newCmp, newForbidden);
163+
}
164+
}
165+
166+
/**
167+
* Given a connected csg-cmp pair and the hyperedges that connect them, build the
168+
* corresponding Join plan. If the new Join plan is better than the existing plan,
169+
* update the {@link DpHyp#dpTable}.
170+
*
171+
* <p>Corresponding to EmitCsgCmp in origin paper.
172+
*/
173+
private void emitCsgCmp(long csg, long cmp, List<HyperEdge> edges) {
174+
RelNode child1 = dpTable.get(csg);
175+
RelNode child2 = dpTable.get(cmp);
176+
if (child1 == null || child2 == null) {
177+
throw new IllegalArgumentException(
178+
"csg and cmp were not enumerated in the previous dp process");
179+
}
180+
181+
JoinRelType joinType = hyperGraph.extractJoinType(edges);
182+
if (joinType == null) {
183+
return;
184+
}
185+
RexNode joinCond1 = hyperGraph.extractJoinCond(child1, child2, edges);
186+
RelNode newPlan1 = builder
187+
.push(child1)
188+
.push(child2)
189+
.join(joinType, joinCond1)
190+
.build();
191+
192+
// swap left and right
193+
RexNode joinCond2 = hyperGraph.extractJoinCond(child2, child1, edges);
194+
RelNode newPlan2 = builder
195+
.push(child2)
196+
.push(child1)
197+
.join(joinType, joinCond2)
198+
.build();
199+
RelNode winPlan = chooseBetterPlan(newPlan1, newPlan2);
200+
201+
RelNode oriPlan = dpTable.get(csg | cmp);
202+
if (oriPlan != null) {
203+
winPlan = chooseBetterPlan(winPlan, oriPlan);
204+
}
205+
dpTable.put(csg | cmp, winPlan);
206+
}
207+
208+
public @Nullable RelNode getBestPlan() {
209+
int size = hyperGraph.getInputs().size();
210+
long wholeGraph = LongBitmap.newBitmapBetween(0, size);
211+
return dpTable.get(wholeGraph);
212+
}
213+
214+
private RelNode chooseBetterPlan(RelNode plan1, RelNode plan2) {
215+
RelOptCost cost1 = mq.getCumulativeCost(plan1);
216+
RelOptCost cost2 = mq.getCumulativeCost(plan2);
217+
if (cost1 != null && cost2 != null) {
218+
return cost1.isLt(cost2) ? plan1 : plan2;
219+
} else if (cost1 != null) {
220+
return plan1;
221+
} else {
222+
return plan2;
223+
}
224+
}
225+
226+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.calcite.rel.rules;
18+
19+
import org.apache.calcite.linq4j.function.Experimental;
20+
import org.apache.calcite.plan.RelOptRuleCall;
21+
import org.apache.calcite.plan.RelRule;
22+
import org.apache.calcite.rel.RelNode;
23+
import org.apache.calcite.rel.core.Join;
24+
import org.apache.calcite.rex.RexBuilder;
25+
import org.apache.calcite.rex.RexNode;
26+
import org.apache.calcite.tools.RelBuilder;
27+
28+
import org.immutables.value.Value;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/** Rule that re-orders a {@link Join} tree using dphyp algorithm.
34+
*
35+
* @see CoreRules#HYPER_GRAPH_OPTIMIZE */
36+
@Value.Enclosing
37+
@Experimental
38+
public class DphypJoinReorderRule
39+
extends RelRule<DphypJoinReorderRule.Config>
40+
implements TransformationRule {
41+
42+
protected DphypJoinReorderRule(Config config) {
43+
super(config);
44+
}
45+
46+
@Override public void onMatch(RelOptRuleCall call) {
47+
HyperGraph hyperGraph = call.rel(0);
48+
RelBuilder relBuilder = call.builder();
49+
50+
// enumerate by Dphyp
51+
DpHyp dpHyp = new DpHyp(hyperGraph, relBuilder, call.getMetadataQuery());
52+
dpHyp.startEnumerateJoin();
53+
RelNode orderedJoin = dpHyp.getBestPlan();
54+
if (orderedJoin == null) {
55+
return;
56+
}
57+
58+
// permute field to origin order
59+
List<String> oriNames = hyperGraph.getRowType().getFieldNames();
60+
List<String> newNames = orderedJoin.getRowType().getFieldNames();
61+
List<RexNode> projects = new ArrayList<>();
62+
RexBuilder rexBuilder = hyperGraph.getCluster().getRexBuilder();
63+
for (String oriName : oriNames) {
64+
projects.add(rexBuilder.makeInputRef(orderedJoin, newNames.indexOf(oriName)));
65+
}
66+
67+
RelNode result = call.builder()
68+
.push(orderedJoin)
69+
.project(projects)
70+
.build();
71+
call.transformTo(result);
72+
}
73+
74+
/** Rule configuration. */
75+
@Value.Immutable
76+
public interface Config extends RelRule.Config {
77+
Config DEFAULT = ImmutableDphypJoinReorderRule.Config.of()
78+
.withOperandSupplier(b1 ->
79+
b1.operand(HyperGraph.class).anyInputs());
80+
81+
@Override default DphypJoinReorderRule toRule() {
82+
return new DphypJoinReorderRule(this);
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)