Skip to content

Commit db493f2

Browse files
committed
HIVE-29322: Avoid TopNKeyOperator When Map-Side LIMIT Pushdown Provides Better Pruning
1 parent 7a7596f commit db493f2

File tree

76 files changed

+1930
-2907
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1930
-2907
lines changed

ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@
2525
import java.util.Stack;
2626

2727
import org.apache.hadoop.hive.conf.HiveConf;
28-
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
29-
import org.apache.hadoop.hive.ql.exec.LimitOperator;
30-
import org.apache.hadoop.hive.ql.exec.Operator;
31-
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
28+
import org.apache.hadoop.hive.ql.exec.*;
3229
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
3330
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
3431
import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
@@ -43,6 +40,7 @@
4340
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
4441
import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
4542
import org.apache.hadoop.hive.ql.plan.LimitDesc;
43+
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
4644

4745
/**
4846
* Make RS calculate top-K selection for limit clause.
@@ -117,18 +115,30 @@ private static class TopNReducer implements SemanticNodeProcessor {
117115
@Override
118116
public Object process(Node nd, Stack<Node> stack,
119117
NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
118+
boolean hasOnlyOrderByLimit = true;
120119
ReduceSinkOperator rs = null;
121120
for (int i = stack.size() - 2 ; i >= 0; i--) {
122121
Operator<?> operator = (Operator<?>) stack.get(i);
123-
if (operator.getNumChild() != 1) {
124-
return false; // multi-GBY single-RS (TODO)
125-
}
126-
if (operator instanceof ReduceSinkOperator) {
127-
rs = (ReduceSinkOperator) operator;
128-
break;
122+
123+
if (operator instanceof GroupByOperator || operator instanceof JoinOperator) {
124+
hasOnlyOrderByLimit = false;
125+
if (rs != null) {
126+
break;
127+
}
129128
}
130-
if (!operator.acceptLimitPushdown()) {
131-
return false;
129+
130+
if (rs == null) {
131+
if (operator.getNumChild() != 1) {
132+
return false; // multi-GBY single-RS (TODO)
133+
}
134+
135+
if (operator instanceof ReduceSinkOperator) {
136+
rs = (ReduceSinkOperator) operator;
137+
continue;
138+
}
139+
if (!operator.acceptLimitPushdown()) {
140+
return false;
141+
}
132142
}
133143
}
134144
if (rs != null) {
@@ -149,6 +159,7 @@ public Object process(Node nd, Stack<Node> stack,
149159
Integer offset = limitDesc.getOffset();
150160
rs.getConf().setTopN(limitDesc.getLimit() + ((offset == null) ? 0 : offset));
151161
rs.getConf().setTopNMemoryUsage(((LimitPushdownContext) procCtx).threshold);
162+
rs.getConf().setHasOnlyOrderByLimit(hasOnlyOrderByLimit);
152163
if (rs.getNumChild() == 1 && rs.getChildren().get(0) instanceof GroupByOperator) {
153164
rs.getConf().setMapGroupBy(true);
154165
}

ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
7171
return null;
7272
}
7373

74+
// Skip the current optimization when a simple global ORDER BY...LIMIT is present (topN > -1 and hasOnlyOrderByLimit()).
75+
// This plan structure is handled more efficiently by the specialized 'TopN In Reducer' optimization.
76+
if (reduceSinkDesc.getTopN() > -1 && reduceSinkDesc.hasOnlyOrderByLimit()) {
77+
return null;
78+
}
79+
7480
if (reduceSinkDesc.getTopN() > maxTopNAllowed) {
7581
return null;
7682
}

ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ private ReducerTraits(int trait) {
141141
// used to decide whether global order is needed
142142
private transient boolean hasOrderBy = false;
143143

144+
// used to decide whether topnkey can be applied
145+
private transient boolean hasOnlyOrderByLimit = false;
146+
144147
private AcidUtils.Operation writeType;
145148

146149
public ReduceSinkDesc() {
@@ -197,6 +200,7 @@ public Object clone() {
197200
desc.reduceTraits = reduceTraits.clone();
198201
desc.setDeduplicated(isDeduplicated);
199202
desc.setHasOrderBy(hasOrderBy);
203+
desc.setHasOnlyOrderByLimit(hasOnlyOrderByLimit);
200204
desc.outputName = outputName;
201205
return desc;
202206
}
@@ -588,6 +592,14 @@ public void setHasOrderBy(boolean hasOrderBy) {
588592
this.hasOrderBy = hasOrderBy;
589593
}
590594

595+
public boolean hasOnlyOrderByLimit() {
596+
return hasOnlyOrderByLimit;
597+
}
598+
599+
public void setHasOnlyOrderByLimit(boolean hasOnlyOrderByLimit) {
600+
this.hasOnlyOrderByLimit = hasOnlyOrderByLimit;
601+
}
602+
591603
// Use LinkedHashSet to give predictable display order.
592604
private static final Set<String> vectorizableReduceSinkNativeEngines =
593605
new LinkedHashSet<String>(Arrays.asList("tez"));

ql/src/test/results/clientpositive/llap/autoColumnStats_4.q.out

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,17 @@ STAGE PLANS:
7474
Filter Operator
7575
predicate: cint is not null (type: boolean)
7676
Statistics: Num rows: 9173 Data size: 671202 Basic stats: COMPLETE Column stats: COMPLETE
77-
Top N Key Operator
78-
sort order: +
79-
keys: cint (type: int)
80-
null sort order: z
81-
Statistics: Num rows: 9173 Data size: 671202 Basic stats: COMPLETE Column stats: COMPLETE
82-
top n: 10
83-
Select Operator
84-
expressions: cint (type: int), CAST( cstring1 AS varchar(128)) (type: varchar(128))
85-
outputColumnNames: _col0, _col1
86-
Statistics: Num rows: 9173 Data size: 977184 Basic stats: COMPLETE Column stats: COMPLETE
87-
Reduce Output Operator
88-
key expressions: _col0 (type: int)
89-
null sort order: z
90-
sort order: +
91-
Statistics: Num rows: 9173 Data size: 977184 Basic stats: COMPLETE Column stats: COMPLETE
92-
value expressions: _col1 (type: varchar(128))
77+
Select Operator
78+
expressions: cint (type: int), CAST( cstring1 AS varchar(128)) (type: varchar(128))
79+
outputColumnNames: _col0, _col1
80+
Statistics: Num rows: 9173 Data size: 1479384 Basic stats: COMPLETE Column stats: COMPLETE
81+
Reduce Output Operator
82+
key expressions: _col0 (type: int)
83+
null sort order: z
84+
sort order: +
85+
Statistics: Num rows: 9173 Data size: 1479384 Basic stats: COMPLETE Column stats: COMPLETE
86+
TopN Hash Memory Usage: 0.1
87+
value expressions: _col1 (type: varchar(128))
9388
Execution mode: vectorized, llap
9489
LLAP IO: all inputs
9590
Reducer 2
@@ -98,27 +93,27 @@ STAGE PLANS:
9893
Select Operator
9994
expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: varchar(128))
10095
outputColumnNames: _col0, _col1
101-
Statistics: Num rows: 9173 Data size: 977184 Basic stats: COMPLETE Column stats: COMPLETE
96+
Statistics: Num rows: 9173 Data size: 1479384 Basic stats: COMPLETE Column stats: COMPLETE
10297
Limit
10398
Number of rows: 10
104-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
99+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
105100
Reduce Output Operator
106101
key expressions: _col0 (type: int)
107102
null sort order: a
108103
sort order: +
109104
Map-reduce partition columns: _col0 (type: int)
110-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
105+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
111106
value expressions: _col1 (type: varchar(128))
112107
Reducer 3
113108
Execution mode: vectorized, llap
114109
Reduce Operator Tree:
115110
Select Operator
116111
expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: varchar(128))
117112
outputColumnNames: _col0, _col1
118-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
113+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
119114
File Output Operator
120115
compressed: false
121-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
116+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
122117
table:
123118
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
124119
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
@@ -128,7 +123,7 @@ STAGE PLANS:
128123
Select Operator
129124
expressions: _col0 (type: int), _col1 (type: varchar(128))
130125
outputColumnNames: a, b
131-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
126+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
132127
Group By Operator
133128
aggregations: min(a), max(a), count(1), count(a), compute_bit_vector_hll(a), max(length(b)), avg(COALESCE(length(b),0)), count(b), compute_bit_vector_hll(b)
134129
minReductionHashAggr: 0.9

ql/src/test/results/clientpositive/llap/cbo_SortUnionTransposeRule.q.out

Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -242,17 +242,12 @@ STAGE PLANS:
242242
expressions: key (type: string)
243243
outputColumnNames: _col0
244244
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
245-
Top N Key Operator
246-
sort order: +
247-
keys: _col0 (type: string)
245+
Reduce Output Operator
246+
key expressions: _col0 (type: string)
248247
null sort order: z
248+
sort order: +
249249
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
250-
top n: 5
251-
Reduce Output Operator
252-
key expressions: _col0 (type: string)
253-
null sort order: z
254-
sort order: +
255-
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
250+
TopN Hash Memory Usage: 0.1
256251
Execution mode: vectorized, llap
257252
LLAP IO: all inputs
258253
Map 4
@@ -264,17 +259,12 @@ STAGE PLANS:
264259
expressions: key (type: string)
265260
outputColumnNames: _col0
266261
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
267-
Top N Key Operator
268-
sort order: +
269-
keys: _col0 (type: string)
262+
Reduce Output Operator
263+
key expressions: _col0 (type: string)
270264
null sort order: z
265+
sort order: +
271266
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
272-
top n: 5
273-
Reduce Output Operator
274-
key expressions: _col0 (type: string)
275-
null sort order: z
276-
sort order: +
277-
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
267+
TopN Hash Memory Usage: 0.1
278268
Execution mode: vectorized, llap
279269
LLAP IO: all inputs
280270
Reducer 3
@@ -719,26 +709,22 @@ STAGE PLANS:
719709
TableScan
720710
alias: a
721711
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
722-
Top N Key Operator
723-
sort order: +
724-
keys: key (type: string)
725-
null sort order: z
712+
Select Operator
713+
expressions: key (type: string)
714+
outputColumnNames: _col0
726715
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
727-
top n: 5
728-
Select Operator
729-
expressions: key (type: string)
730-
outputColumnNames: _col0
716+
Reduce Output Operator
717+
key expressions: _col0 (type: string)
718+
null sort order: z
719+
sort order: +
731720
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
732-
Reduce Output Operator
733-
key expressions: _col0 (type: string)
734-
null sort order: z
735-
sort order: +
736-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
737-
Reduce Output Operator
738-
key expressions: _col0 (type: string)
739-
null sort order: z
740-
sort order: +
741-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
721+
TopN Hash Memory Usage: 0.1
722+
Reduce Output Operator
723+
key expressions: _col0 (type: string)
724+
null sort order: z
725+
sort order: +
726+
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
727+
TopN Hash Memory Usage: 0.1
742728
Execution mode: vectorized, llap
743729
LLAP IO: all inputs
744730
Reducer 2
@@ -751,17 +737,12 @@ STAGE PLANS:
751737
Limit
752738
Number of rows: 5
753739
Statistics: Num rows: 5 Data size: 435 Basic stats: COMPLETE Column stats: COMPLETE
754-
Top N Key Operator
755-
sort order: +
756-
keys: _col0 (type: string)
740+
Reduce Output Operator
741+
key expressions: _col0 (type: string)
757742
null sort order: z
743+
sort order: +
758744
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
759-
top n: 5
760-
Reduce Output Operator
761-
key expressions: _col0 (type: string)
762-
null sort order: z
763-
sort order: +
764-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
745+
TopN Hash Memory Usage: 0.1
765746
Reducer 4
766747
Execution mode: vectorized, llap
767748
Reduce Operator Tree:
@@ -789,17 +770,12 @@ STAGE PLANS:
789770
Limit
790771
Number of rows: 5
791772
Statistics: Num rows: 5 Data size: 435 Basic stats: COMPLETE Column stats: COMPLETE
792-
Top N Key Operator
793-
sort order: +
794-
keys: _col0 (type: string)
773+
Reduce Output Operator
774+
key expressions: _col0 (type: string)
795775
null sort order: z
776+
sort order: +
796777
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
797-
top n: 5
798-
Reduce Output Operator
799-
key expressions: _col0 (type: string)
800-
null sort order: z
801-
sort order: +
802-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
778+
TopN Hash Memory Usage: 0.1
803779
Union 3
804780
Vertex: Union 3
805781

0 commit comments

Comments
 (0)