Skip to content

Commit 6b38b5d

Browse files
authored
Optimize sort properties of JoinNode in case of JOIN USING
1 parent 12d8c4e commit 6b38b5d

File tree

3 files changed

+200
-7
lines changed

3 files changed

+200
-7
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode;
8585
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory;
8686
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
87+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression;
8788
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
8889
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
8990
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -104,6 +105,7 @@
104105
import java.util.Comparator;
105106
import java.util.HashMap;
106107
import java.util.HashSet;
108+
import java.util.LinkedHashMap;
107109
import java.util.List;
108110
import java.util.Map;
109111
import java.util.Objects;
@@ -231,10 +233,51 @@ public List<PlanNode> visitProject(ProjectNode node, PlanContext context) {
231233
ImmutableSet.copyOf(node.getOutputSymbols()).containsAll(childOrdering.getOrderBy());
232234
}
233235
if (childrenNodes.size() == 1) {
236+
PlanNode child = childrenNodes.get(0);
234237
if (containAllSortItem) {
235238
nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
236239
}
237-
node.setChild(childrenNodes.get(0));
240+
241+
// Now the join implement but CROSS is MergeSortJoin, so it can keep order
242+
if (child instanceof JoinNode) {
243+
JoinNode joinNode = (JoinNode) child;
244+
245+
// We only process FULL Join here, other type will be processed in visitJoinNode()
246+
if (joinNode.getJoinType() == JoinNode.JoinType.FULL
247+
&& !joinNode.getAsofCriteria().isPresent()) {
248+
Map<Symbol, Expression> assignmentsMap = node.getAssignments().getMap();
249+
// If these Coalesces are all appear in ProjectNode, the ProjectNode is ordered
250+
int coalescesSize = joinNode.getCriteria().size();
251+
252+
// We use map to memorize Symbol of according Coalesce, use linked to avoid twice query of
253+
// this Map when constructOrderingSchema
254+
Map<Expression, Symbol> orderedCoalesces = new LinkedHashMap<>(coalescesSize);
255+
for (JoinNode.EquiJoinClause clause : joinNode.getCriteria()) {
256+
orderedCoalesces.put(
257+
new CoalesceExpression(
258+
ImmutableList.of(
259+
clause.getLeft().toSymbolReference(),
260+
clause.getRight().toSymbolReference())),
261+
null);
262+
}
263+
264+
for (Map.Entry<Symbol, Expression> assignment : assignmentsMap.entrySet()) {
265+
if (orderedCoalesces.containsKey(assignment.getValue())) {
266+
coalescesSize--;
267+
orderedCoalesces.put(assignment.getValue(), assignment.getKey());
268+
}
269+
}
270+
271+
// All Coalesces appear in ProjectNode
272+
if (coalescesSize == 0) {
273+
nodeOrderingMap.put(
274+
node.getPlanNodeId(),
275+
constructOrderingSchema(new ArrayList<>(orderedCoalesces.values())));
276+
}
277+
}
278+
}
279+
280+
node.setChild(child);
238281
return Collections.singletonList(node);
239282
}
240283

@@ -481,14 +524,36 @@ public List<PlanNode> visitJoin(JoinNode node, PlanContext context) {
481524
rightChildrenNodes.size() == 1,
482525
"The size of right children node of JoinNode should be 1");
483526
}
527+
528+
OrderingScheme leftChildOrdering = nodeOrderingMap.get(node.getLeftChild().getPlanNodeId());
529+
OrderingScheme rightChildOrdering = nodeOrderingMap.get(node.getRightChild().getPlanNodeId());
530+
484531
// For CrossJoinNode, we need to merge children nodes(It's safe for other JoinNodes here since
485532
// the size of their children is always 1.)
486-
node.setLeftChild(
487-
mergeChildrenViaCollectOrMergeSort(
488-
nodeOrderingMap.get(node.getLeftChild().getPlanNodeId()), leftChildrenNodes));
489-
node.setRightChild(
490-
mergeChildrenViaCollectOrMergeSort(
491-
nodeOrderingMap.get(node.getRightChild().getPlanNodeId()), rightChildrenNodes));
533+
node.setLeftChild(mergeChildrenViaCollectOrMergeSort(leftChildOrdering, leftChildrenNodes));
534+
node.setRightChild(mergeChildrenViaCollectOrMergeSort(rightChildOrdering, rightChildrenNodes));
535+
536+
// Now the join implement but CROSS is MergeSortJoin, so it can keep order
537+
if (!node.isCrossJoin() && !node.getAsofCriteria().isPresent()) {
538+
switch (node.getJoinType()) {
539+
case FULL:
540+
// If join type is FULL Join, we will process SortProperties in ProjectNode above this
541+
// node.
542+
break;
543+
case INNER:
544+
case LEFT:
545+
if (ImmutableSet.copyOf(node.getLeftOutputSymbols())
546+
.containsAll(leftChildOrdering.getOrderBy())) {
547+
nodeOrderingMap.put(node.getPlanNodeId(), leftChildOrdering);
548+
}
549+
break;
550+
case RIGHT:
551+
throw new IllegalStateException(
552+
"RIGHT Join should be transformed to LEFT Join in previous process");
553+
default:
554+
throw new UnsupportedOperationException("Unsupported Join Type: " + node.getJoinType());
555+
}
556+
}
492557
return Collections.singletonList(node);
493558
}
494559

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@
7171
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation;
7272
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationFunction;
7373
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationTableScan;
74+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
7475
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
7576
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.join;
77+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
7678
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
7779
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
7880
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
@@ -635,4 +637,126 @@ public void otherInnerJoinTests() {
635637
+ "ON t1.time = t2.time ORDER BY t1.tag1 OFFSET 3 LIMIT 6",
636638
false);
637639
}
640+
641+
@Test
642+
public void testJoinSortProperties() {
643+
// FULL JOIN
644+
PlanTester planTester = new PlanTester();
645+
sql =
646+
"select * from table1 t1 "
647+
+ "full join table1 t2 using (time, s1)"
648+
+ "full join table1 t3 using (time, s1)";
649+
logicalQueryPlan = planTester.createPlan(sql);
650+
assertPlan(
651+
logicalQueryPlan.getRootNode(),
652+
output(
653+
project(
654+
join(
655+
sort(
656+
project(
657+
join(
658+
sort(tableScan("testdb.table1")),
659+
sort(tableScan("testdb.table1"))))),
660+
sort(tableScan("testdb.table1"))))));
661+
662+
assertPlan(planTester.getFragmentPlan(0), output(project(join(exchange(), exchange()))));
663+
664+
// the sort node above JoinNode has been eliminated
665+
assertPlan(planTester.getFragmentPlan(1), project(join(exchange(), exchange())));
666+
667+
assertPlan(planTester.getFragmentPlan(2), mergeSort(exchange(), exchange(), exchange()));
668+
669+
assertPlan(planTester.getFragmentPlan(3), sort(tableScan("testdb.table1")));
670+
671+
assertPlan(planTester.getFragmentPlan(4), sort(tableScan("testdb.table1")));
672+
673+
assertPlan(planTester.getFragmentPlan(5), sort(tableScan("testdb.table1")));
674+
675+
assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(), exchange(), exchange()));
676+
677+
assertPlan(planTester.getFragmentPlan(7), sort(tableScan("testdb.table1")));
678+
679+
assertPlan(planTester.getFragmentPlan(8), sort(tableScan("testdb.table1")));
680+
681+
assertPlan(planTester.getFragmentPlan(9), sort(tableScan("testdb.table1")));
682+
683+
assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(), exchange(), exchange()));
684+
685+
// LEFT
686+
sql =
687+
"select * from table1 t1 "
688+
+ "left join table1 t2 using (time, s1)"
689+
+ "left join table1 t3 using (time, s1)";
690+
assertLeftOrInner(planTester);
691+
692+
// INNER JOIN
693+
sql =
694+
"select * from table1 t1 "
695+
+ "inner join table1 t2 using (time, s1)"
696+
+ "inner join table1 t3 using (time, s1)";
697+
assertLeftOrInner(planTester);
698+
699+
// RIGHT JOIN
700+
sql =
701+
"select * from table1 t1 "
702+
+ "right join table1 t2 using (time, s1)"
703+
+ "right join table1 t3 using (time, s1)";
704+
logicalQueryPlan = planTester.createPlan(sql);
705+
assertPlan(
706+
logicalQueryPlan.getRootNode(),
707+
output(
708+
join(
709+
sort(tableScan("testdb.table1")),
710+
sort(join(sort(tableScan("testdb.table1")), sort(tableScan("testdb.table1")))))));
711+
712+
assertPlan(planTester.getFragmentPlan(0), output(join(exchange(), exchange())));
713+
714+
assertPlan(planTester.getFragmentPlan(1), mergeSort(exchange(), exchange(), exchange()));
715+
716+
assertPlan(planTester.getFragmentPlan(2), sort(tableScan("testdb.table1")));
717+
718+
assertPlan(planTester.getFragmentPlan(3), sort(tableScan("testdb.table1")));
719+
720+
assertPlan(planTester.getFragmentPlan(4), sort(tableScan("testdb.table1")));
721+
722+
// the sort node above JoinNode has been eliminated
723+
assertPlan(planTester.getFragmentPlan(5), join(exchange(), exchange()));
724+
725+
assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(), exchange(), exchange()));
726+
727+
assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(), exchange(), exchange()));
728+
}
729+
730+
private void assertLeftOrInner(PlanTester planTester) {
731+
logicalQueryPlan = planTester.createPlan(sql);
732+
assertPlan(
733+
logicalQueryPlan.getRootNode(),
734+
output(
735+
join(
736+
sort(join(sort(tableScan("testdb.table1")), sort(tableScan("testdb.table1")))),
737+
sort(tableScan("testdb.table1")))));
738+
739+
assertPlan(planTester.getFragmentPlan(0), output(join(exchange(), exchange())));
740+
741+
// the sort node above JoinNode has been eliminated
742+
assertPlan(planTester.getFragmentPlan(1), join(exchange(), exchange()));
743+
744+
assertPlan(planTester.getFragmentPlan(2), mergeSort(exchange(), exchange(), exchange()));
745+
746+
assertPlan(planTester.getFragmentPlan(3), sort(tableScan("testdb.table1")));
747+
748+
assertPlan(planTester.getFragmentPlan(4), sort(tableScan("testdb.table1")));
749+
750+
assertPlan(planTester.getFragmentPlan(5), sort(tableScan("testdb.table1")));
751+
752+
assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(), exchange(), exchange()));
753+
754+
assertPlan(planTester.getFragmentPlan(7), sort(tableScan("testdb.table1")));
755+
756+
assertPlan(planTester.getFragmentPlan(8), sort(tableScan("testdb.table1")));
757+
758+
assertPlan(planTester.getFragmentPlan(9), sort(tableScan("testdb.table1")));
759+
760+
assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(), exchange(), exchange()));
761+
}
638762
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,10 @@ public static PlanMatchPattern patternRecognition(Consumer<PatternRecognitionMat
516516
return builder.build();
517517
}*/
518518

519+
public static PlanMatchPattern join(PlanMatchPattern left, PlanMatchPattern right) {
520+
return node(JoinNode.class, left, right);
521+
}
522+
519523
public static PlanMatchPattern join(
520524
JoinNode.JoinType type, Consumer<JoinMatcher.Builder> handler) {
521525
JoinMatcher.Builder builder = new JoinMatcher.Builder(type);

0 commit comments

Comments
 (0)