Skip to content

Commit 7e16ad5

Browse files
committed
HIVE-29322: Avoid TopNKeyOperator When ReduceSink TopNkey Filtering Provides Better Pruning
1 parent 314e67f commit 7e16ad5

File tree

74 files changed

+1908
-2850
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1908
-2850
lines changed

ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ public Object process(Node nd, Stack<Node> stack,
198198
}
199199
currentOp = currentOp.getChildOperators().get(0);
200200
}
201+
// Skip RS copy for plans that are already TopNKey-optimized to avoid re-propagating
202+
// TopNKey configuration into the same ReduceSink.
203+
if (((LimitPushdownContext)procCtx).disallowRSCopy && foundGroupByOperator) {
204+
return false;
205+
}
201206
List<ExprNodeDesc> cKeys = cRS.getConf().getKeyCols();
202207
List<ExprNodeDesc> pKeys = pRS.getConf().getKeyCols();
203208
if (pRS.getChildren().get(0) instanceof GroupByOperator &&
@@ -250,8 +255,12 @@ private static class LimitPushdownContext implements NodeProcessorCtx {
250255

251256
private final float threshold;
252257

258+
private final boolean disallowRSCopy;
259+
253260
public LimitPushdownContext(HiveConf conf) throws SemanticException {
254261
threshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_LIMIT_PUSHDOWN_MEMORY_USAGE);
262+
disallowRSCopy = conf.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_TOPNKEY) &&
263+
!conf.getBoolVar(HiveConf.ConfVars.HIVE_MAPSIDE_AGGREGATE);
255264
if (threshold <= 0 || threshold >= 1) {
256265
throw new SemanticException("Invalid memory usage value " + threshold +
257266
" for " + HiveConf.ConfVars.HIVE_LIMIT_PUSHDOWN_MEMORY_USAGE);

ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,26 +45,7 @@
4545
import org.apache.hadoop.hive.llap.LlapHiveUtils;
4646
import org.apache.hadoop.hive.ql.Context;
4747
import org.apache.hadoop.hive.ql.QueryState;
48-
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
49-
import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
50-
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
51-
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
52-
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
53-
import org.apache.hadoop.hive.ql.exec.FilterOperator;
54-
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
55-
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
56-
import org.apache.hadoop.hive.ql.exec.JoinOperator;
57-
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
58-
import org.apache.hadoop.hive.ql.exec.Operator;
59-
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
60-
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
61-
import org.apache.hadoop.hive.ql.exec.SelectOperator;
62-
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
63-
import org.apache.hadoop.hive.ql.exec.Task;
64-
import org.apache.hadoop.hive.ql.exec.TerminalOperator;
65-
import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
66-
import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
67-
import org.apache.hadoop.hive.ql.exec.UnionOperator;
48+
import org.apache.hadoop.hive.ql.exec.*;
6849
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
6950
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
7051
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -1300,9 +1281,10 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx)
13001281
return;
13011282
}
13021283

1284+
String topNKeyRegexPattern = buildTopNKeyRegexPattern(procCtx);
13031285
Map<SemanticRule, SemanticNodeProcessor> opRules = new LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
13041286
opRules.put(
1305-
new RuleRegExp("Top n key optimization", ReduceSinkOperator.getOperatorName() + "%"),
1287+
new RuleRegExp("Top n key optimization", topNKeyRegexPattern),
13061288
new TopNKeyProcessor(
13071289
HiveConf.getIntVar(procCtx.conf, HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED),
13081290
HiveConf.getFloatVar(procCtx.conf, ConfVars.HIVE_TOPN_EFFICIENCY_THRESHOLD),
@@ -1322,6 +1304,54 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx)
13221304
ogw.startWalking(topNodes, null);
13231305
}
13241306

1307+
/*
1308+
* Build the ReduceSink matching pattern used by TopNKey optimization.
1309+
*
1310+
* For ORDER BY / LIMIT queries that do not involve GROUP BY or JOIN,
1311+
* applying TopNKey results in a performance regression. ReduceSink
1312+
* operators created only for ordering must therefore be excluded from
1313+
* TopNKey.
1314+
*
1315+
* When ORDER BY or LIMIT is present, restrict TopNKey to ReduceSink
1316+
* operators that originate from GROUP BY, JOIN, MAPJOIN, LATERAL VIEW
1317+
* JOIN or PTF query shapes. SELECT and FILTER operators may appear in
1318+
* between.
1319+
*/
1320+
private static String buildTopNKeyRegexPattern(OptimizeTezProcContext procCtx) {
1321+
String reduceSinkOp = ReduceSinkOperator.getOperatorName() + "%";
1322+
1323+
boolean hasOrderOrLimit =
1324+
procCtx.parseContext.getQueryProperties().hasLimit() ||
1325+
procCtx.parseContext.getQueryProperties().hasOrderBy();
1326+
1327+
if (hasPTFReduceSink(procCtx) || !hasOrderOrLimit) {
1328+
return reduceSinkOp;
1329+
}
1330+
1331+
return "("
1332+
+ GroupByOperator.getOperatorName() + "|"
1333+
+ PTFOperator.getOperatorName() + "|"
1334+
+ JoinOperator.getOperatorName() + "|"
1335+
+ MapJoinOperator.getOperatorName() + "|"
1336+
+ LateralViewJoinOperator.getOperatorName() + "|"
1337+
+ CommonMergeJoinOperator.getOperatorName()
1338+
+ ")"
1339+
+ "(%("
1340+
+ SelectOperator.getOperatorName() + "|"
1341+
+ FilterOperator.getOperatorName()
1342+
+ "))*%"
1343+
+ reduceSinkOp;
1344+
}
1345+
1346+
private static boolean hasPTFReduceSink(OptimizeTezProcContext ctx) {
1347+
for (ReduceSinkOperator rs : ctx.visitedReduceSinks) {
1348+
if (rs.getConf().isPTFReduceSink()) {
1349+
return true;
1350+
}
1351+
}
1352+
return false;
1353+
}
1354+
13251355
private boolean findParallelSemiJoinBranch(Operator<?> mapjoin, TableScanOperator bigTableTS,
13261356
ParseContext parseContext,
13271357
Map<ReduceSinkOperator, TableScanOperator> semijoins,

ql/src/test/results/clientpositive/llap/autoColumnStats_4.q.out

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,17 @@ STAGE PLANS:
7474
Filter Operator
7575
predicate: cint is not null (type: boolean)
7676
Statistics: Num rows: 9173 Data size: 671202 Basic stats: COMPLETE Column stats: COMPLETE
77-
Top N Key Operator
78-
sort order: +
79-
keys: cint (type: int)
80-
null sort order: z
81-
Statistics: Num rows: 9173 Data size: 671202 Basic stats: COMPLETE Column stats: COMPLETE
82-
top n: 10
83-
Select Operator
84-
expressions: cint (type: int), CAST( cstring1 AS varchar(128)) (type: varchar(128))
85-
outputColumnNames: _col0, _col1
86-
Statistics: Num rows: 9173 Data size: 977184 Basic stats: COMPLETE Column stats: COMPLETE
87-
Reduce Output Operator
88-
key expressions: _col0 (type: int)
89-
null sort order: z
90-
sort order: +
91-
Statistics: Num rows: 9173 Data size: 977184 Basic stats: COMPLETE Column stats: COMPLETE
92-
value expressions: _col1 (type: varchar(128))
77+
Select Operator
78+
expressions: cint (type: int), CAST( cstring1 AS varchar(128)) (type: varchar(128))
79+
outputColumnNames: _col0, _col1
80+
Statistics: Num rows: 9173 Data size: 1479384 Basic stats: COMPLETE Column stats: COMPLETE
81+
Reduce Output Operator
82+
key expressions: _col0 (type: int)
83+
null sort order: z
84+
sort order: +
85+
Statistics: Num rows: 9173 Data size: 1479384 Basic stats: COMPLETE Column stats: COMPLETE
86+
TopN Hash Memory Usage: 0.1
87+
value expressions: _col1 (type: varchar(128))
9388
Execution mode: vectorized, llap
9489
LLAP IO: all inputs
9590
Reducer 2
@@ -98,27 +93,27 @@ STAGE PLANS:
9893
Select Operator
9994
expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: varchar(128))
10095
outputColumnNames: _col0, _col1
101-
Statistics: Num rows: 9173 Data size: 977184 Basic stats: COMPLETE Column stats: COMPLETE
96+
Statistics: Num rows: 9173 Data size: 1479384 Basic stats: COMPLETE Column stats: COMPLETE
10297
Limit
10398
Number of rows: 10
104-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
99+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
105100
Reduce Output Operator
106101
key expressions: _col0 (type: int)
107102
null sort order: a
108103
sort order: +
109104
Map-reduce partition columns: _col0 (type: int)
110-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
105+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
111106
value expressions: _col1 (type: varchar(128))
112107
Reducer 3
113108
Execution mode: vectorized, llap
114109
Reduce Operator Tree:
115110
Select Operator
116111
expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: varchar(128))
117112
outputColumnNames: _col0, _col1
118-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
113+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
119114
File Output Operator
120115
compressed: false
121-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
116+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
122117
table:
123118
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
124119
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
@@ -128,7 +123,7 @@ STAGE PLANS:
128123
Select Operator
129124
expressions: _col0 (type: int), _col1 (type: varchar(128))
130125
outputColumnNames: a, b
131-
Statistics: Num rows: 10 Data size: 1296 Basic stats: COMPLETE Column stats: COMPLETE
126+
Statistics: Num rows: 10 Data size: 1728 Basic stats: COMPLETE Column stats: COMPLETE
132127
Group By Operator
133128
aggregations: min(a), max(a), count(1), count(a), compute_bit_vector_hll(a), max(length(b)), avg(COALESCE(length(b),0)), count(b), compute_bit_vector_hll(b)
134129
minReductionHashAggr: 0.9

ql/src/test/results/clientpositive/llap/cbo_SortUnionTransposeRule.q.out

Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -242,17 +242,12 @@ STAGE PLANS:
242242
expressions: key (type: string)
243243
outputColumnNames: _col0
244244
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
245-
Top N Key Operator
246-
sort order: +
247-
keys: _col0 (type: string)
245+
Reduce Output Operator
246+
key expressions: _col0 (type: string)
248247
null sort order: z
248+
sort order: +
249249
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
250-
top n: 5
251-
Reduce Output Operator
252-
key expressions: _col0 (type: string)
253-
null sort order: z
254-
sort order: +
255-
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
250+
TopN Hash Memory Usage: 0.1
256251
Execution mode: vectorized, llap
257252
LLAP IO: all inputs
258253
Map 4
@@ -264,17 +259,12 @@ STAGE PLANS:
264259
expressions: key (type: string)
265260
outputColumnNames: _col0
266261
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
267-
Top N Key Operator
268-
sort order: +
269-
keys: _col0 (type: string)
262+
Reduce Output Operator
263+
key expressions: _col0 (type: string)
270264
null sort order: z
265+
sort order: +
271266
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
272-
top n: 5
273-
Reduce Output Operator
274-
key expressions: _col0 (type: string)
275-
null sort order: z
276-
sort order: +
277-
Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE
267+
TopN Hash Memory Usage: 0.1
278268
Execution mode: vectorized, llap
279269
LLAP IO: all inputs
280270
Reducer 3
@@ -719,26 +709,22 @@ STAGE PLANS:
719709
TableScan
720710
alias: a
721711
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
722-
Top N Key Operator
723-
sort order: +
724-
keys: key (type: string)
725-
null sort order: z
712+
Select Operator
713+
expressions: key (type: string)
714+
outputColumnNames: _col0
726715
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
727-
top n: 5
728-
Select Operator
729-
expressions: key (type: string)
730-
outputColumnNames: _col0
716+
Reduce Output Operator
717+
key expressions: _col0 (type: string)
718+
null sort order: z
719+
sort order: +
731720
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
732-
Reduce Output Operator
733-
key expressions: _col0 (type: string)
734-
null sort order: z
735-
sort order: +
736-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
737-
Reduce Output Operator
738-
key expressions: _col0 (type: string)
739-
null sort order: z
740-
sort order: +
741-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
721+
TopN Hash Memory Usage: 0.1
722+
Reduce Output Operator
723+
key expressions: _col0 (type: string)
724+
null sort order: z
725+
sort order: +
726+
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
727+
TopN Hash Memory Usage: 0.1
742728
Execution mode: vectorized, llap
743729
LLAP IO: all inputs
744730
Reducer 2
@@ -751,17 +737,12 @@ STAGE PLANS:
751737
Limit
752738
Number of rows: 5
753739
Statistics: Num rows: 5 Data size: 435 Basic stats: COMPLETE Column stats: COMPLETE
754-
Top N Key Operator
755-
sort order: +
756-
keys: _col0 (type: string)
740+
Reduce Output Operator
741+
key expressions: _col0 (type: string)
757742
null sort order: z
743+
sort order: +
758744
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
759-
top n: 5
760-
Reduce Output Operator
761-
key expressions: _col0 (type: string)
762-
null sort order: z
763-
sort order: +
764-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
745+
TopN Hash Memory Usage: 0.1
765746
Reducer 4
766747
Execution mode: vectorized, llap
767748
Reduce Operator Tree:
@@ -789,17 +770,12 @@ STAGE PLANS:
789770
Limit
790771
Number of rows: 5
791772
Statistics: Num rows: 5 Data size: 435 Basic stats: COMPLETE Column stats: COMPLETE
792-
Top N Key Operator
793-
sort order: +
794-
keys: _col0 (type: string)
773+
Reduce Output Operator
774+
key expressions: _col0 (type: string)
795775
null sort order: z
776+
sort order: +
796777
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
797-
top n: 5
798-
Reduce Output Operator
799-
key expressions: _col0 (type: string)
800-
null sort order: z
801-
sort order: +
802-
Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE
778+
TopN Hash Memory Usage: 0.1
803779
Union 3
804780
Vertex: Union 3
805781

0 commit comments

Comments
 (0)