Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) {

val dataFilters = new ListBuffer[Expr]()
for (filter <- scan.dataFilters) {
for (filter <- scan.supportedDataFilters) {
exprToProto(filter, scan.output) match {
case Some(proto) => dataFilters += proto
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,21 @@ case class CometScanExec(
}
}

@transient
private lazy val pushedDownFilters = {
val supportedFilters = if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) {
// `native_datafusion` scan does not support subquery pushdown filters,
// see: https://github.com/apache/datafusion-comet/issues/2424
/**
* Returns the data filters that are supported for this scan implementation. For
* native_datafusion scans, this excludes dynamic pruning filters (subqueries)
*/
lazy val supportedDataFilters: Seq[Expression] = {
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) {
dataFilters.filterNot(isDynamicPruningFilter)
} else {
dataFilters
}
getPushedDownFilters(relation, supportedFilters)
}

@transient
private lazy val pushedDownFilters = {
getPushedDownFilters(relation, supportedDataFilters)
}

override lazy val metadata: Map[String, String] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
== Physical Plan ==
* ColumnarToRow (40)
+- CometTakeOrderedAndProject (39)
+- CometProject (38)
+- CometBroadcastHashJoin (37)
* CometColumnarToRow (41)
+- CometTakeOrderedAndProject (40)
+- CometProject (39)
+- CometBroadcastHashJoin (38)
:- CometProject (33)
: +- CometBroadcastHashJoin (32)
: :- CometProject (27)
Expand All @@ -14,11 +14,11 @@
: : : +- CometProject (8)
: : : +- CometBroadcastHashJoin (7)
: : : :- CometFilter (2)
: : : : +- CometNativeScan: `spark_catalog`.`default`.`store_returns` (1)
: : : : +- CometNativeScan parquet spark_catalog.default.store_returns (1)
: : : +- CometBroadcastExchange (6)
: : : +- CometProject (5)
: : : +- CometFilter (4)
: : : +- CometNativeScan: `spark_catalog`.`default`.`date_dim` (3)
: : : +- CometNativeScan parquet spark_catalog.default.date_dim (3)
: : +- CometBroadcastExchange (25)
: : +- CometFilter (24)
: : +- CometHashAggregate (23)
Expand All @@ -30,28 +30,36 @@
: : +- CometProject (17)
: : +- CometBroadcastHashJoin (16)
: : :- CometFilter (14)
: : : +- CometNativeScan: `spark_catalog`.`default`.`store_returns` (13)
: : : +- CometNativeScan parquet spark_catalog.default.store_returns (13)
: : +- ReusedExchange (15)
: +- CometBroadcastExchange (31)
: +- CometProject (30)
: +- CometFilter (29)
: +- CometNativeScan: `spark_catalog`.`default`.`store` (28)
+- CometBroadcastExchange (36)
+- CometFilter (35)
+- CometNativeScan: `spark_catalog`.`default`.`customer` (34)
: +- CometNativeScan parquet spark_catalog.default.store (28)
+- CometBroadcastExchange (37)
+- CometProject (36)
+- CometFilter (35)
+- CometNativeScan parquet spark_catalog.default.customer (34)


(1) CometNativeScan: `spark_catalog`.`default`.`store_returns`
(1) CometNativeScan parquet spark_catalog.default.store_returns
Output [4]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Arguments: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Batched: true
Location: InMemoryFileIndex(0 paths)[]
PartitionFilters: [isnotnull(sr_returned_date_sk#4)]
PushedFilters: [IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)]
ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)>

(2) CometFilter
Input [4]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Condition : (isnotnull(sr_store_sk#2) AND isnotnull(sr_customer_sk#1))

(3) CometNativeScan: `spark_catalog`.`default`.`date_dim`
(3) CometNativeScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#5, d_year#6]
Arguments: [d_date_sk#5, d_year#6]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>

(4) CometFilter
Input [2]: [d_date_sk#5, d_year#6]
Expand Down Expand Up @@ -92,9 +100,13 @@ Functions [1]: [sum(UnscaledValue(sr_return_amt#3))]
Input [3]: [ctr_customer_sk#8, ctr_store_sk#9, ctr_total_return#10]
Condition : isnotnull(ctr_total_return#10)

(13) CometNativeScan: `spark_catalog`.`default`.`store_returns`
(13) CometNativeScan parquet spark_catalog.default.store_returns
Output [4]: [sr_customer_sk#11, sr_store_sk#12, sr_return_amt#13, sr_returned_date_sk#14]
Arguments: [sr_customer_sk#11, sr_store_sk#12, sr_return_amt#13, sr_returned_date_sk#14]
Batched: true
Location: InMemoryFileIndex(0 paths)[]
PartitionFilters: [isnotnull(sr_returned_date_sk#14)]
PushedFilters: [IsNotNull(sr_store_sk)]
ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)>

(14) CometFilter
Input [4]: [sr_customer_sk#11, sr_store_sk#12, sr_return_amt#13, sr_returned_date_sk#14]
Expand Down Expand Up @@ -157,13 +169,16 @@ Arguments: [ctr_store_sk#9], [ctr_store_sk#17], Inner, (cast(ctr_total_return#10
Input [5]: [ctr_customer_sk#8, ctr_store_sk#9, ctr_total_return#10, (avg(ctr_total_return) * 1.2)#21, ctr_store_sk#17]
Arguments: [ctr_customer_sk#8, ctr_store_sk#9], [ctr_customer_sk#8, ctr_store_sk#9]

(28) CometNativeScan: `spark_catalog`.`default`.`store`
(28) CometNativeScan parquet spark_catalog.default.store
Output [2]: [s_store_sk#22, s_state#23]
Arguments: [s_store_sk#22, s_state#23]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store]
PushedFilters: [IsNotNull(s_store_sk)]
ReadSchema: struct<s_store_sk:int,s_state:string>

(29) CometFilter
Input [2]: [s_store_sk#22, s_state#23]
Condition : ((isnotnull(s_state#23) AND (s_state#23 = TN)) AND isnotnull(s_store_sk#22))
Condition : ((staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, s_state#23, 2, true, false, true) = TN) AND isnotnull(s_store_sk#22))

(30) CometProject
Input [2]: [s_store_sk#22, s_state#23]
Expand All @@ -182,31 +197,38 @@ Arguments: [ctr_store_sk#9], [s_store_sk#22], Inner, BuildRight
Input [3]: [ctr_customer_sk#8, ctr_store_sk#9, s_store_sk#22]
Arguments: [ctr_customer_sk#8], [ctr_customer_sk#8]

(34) CometNativeScan: `spark_catalog`.`default`.`customer`
(34) CometNativeScan parquet spark_catalog.default.customer
Output [2]: [c_customer_sk#24, c_customer_id#25]
Arguments: [c_customer_sk#24, c_customer_id#25]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string>

(35) CometFilter
Input [2]: [c_customer_sk#24, c_customer_id#25]
Condition : isnotnull(c_customer_sk#24)

(36) CometBroadcastExchange
(36) CometProject
Input [2]: [c_customer_sk#24, c_customer_id#25]
Arguments: [c_customer_sk#24, c_customer_id#25]
Arguments: [c_customer_sk#24, c_customer_id#26], [c_customer_sk#24, staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, c_customer_id#25, 16, true, false, true) AS c_customer_id#26]

(37) CometBroadcastHashJoin
(37) CometBroadcastExchange
Input [2]: [c_customer_sk#24, c_customer_id#26]
Arguments: [c_customer_sk#24, c_customer_id#26]

(38) CometBroadcastHashJoin
Left output [1]: [ctr_customer_sk#8]
Right output [2]: [c_customer_sk#24, c_customer_id#25]
Right output [2]: [c_customer_sk#24, c_customer_id#26]
Arguments: [ctr_customer_sk#8], [c_customer_sk#24], Inner, BuildRight

(38) CometProject
Input [3]: [ctr_customer_sk#8, c_customer_sk#24, c_customer_id#25]
Arguments: [c_customer_id#25], [c_customer_id#25]
(39) CometProject
Input [3]: [ctr_customer_sk#8, c_customer_sk#24, c_customer_id#26]
Arguments: [c_customer_id#26], [c_customer_id#26]

(39) CometTakeOrderedAndProject
Input [1]: [c_customer_id#25]
Arguments: TakeOrderedAndProject(limit=100, orderBy=[c_customer_id#25 ASC NULLS FIRST], output=[c_customer_id#25]), [c_customer_id#25], 100, [c_customer_id#25 ASC NULLS FIRST], [c_customer_id#25]
(40) CometTakeOrderedAndProject
Input [1]: [c_customer_id#26]
Arguments: TakeOrderedAndProject(limit=100, orderBy=[c_customer_id#26 ASC NULLS FIRST], output=[c_customer_id#26]), [c_customer_id#26], 100, 0, [c_customer_id#26 ASC NULLS FIRST], [c_customer_id#26]

(40) ColumnarToRow [codegen id : 1]
Input [1]: [c_customer_id#25]
(41) CometColumnarToRow [codegen id : 1]
Input [1]: [c_customer_id#26]

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
CometColumnarToRow
+- CometTakeOrderedAndProject
+- CometProject
+- CometBroadcastHashJoin
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometFilter
: : : +- CometHashAggregate
: : : +- CometExchange
: : : +- CometHashAggregate
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.store_returns
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- CometBroadcastExchange
: : +- CometFilter
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.store_returns
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- CometBroadcastExchange
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.store
+- CometBroadcastExchange
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer

Comet accelerated 43 out of 43 eligible operators (100%). Final plan contains 1 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
WholeStageCodegen (1)
ColumnarToRow
CometColumnarToRow
InputAdapter
CometTakeOrderedAndProject [c_customer_id]
CometProject [c_customer_id]
Expand All @@ -9,34 +9,35 @@ WholeStageCodegen (1)
CometProject [ctr_customer_sk,ctr_store_sk]
CometBroadcastHashJoin [ctr_customer_sk,ctr_store_sk,ctr_total_return,(avg(ctr_total_return) * 1.2),ctr_store_sk]
CometFilter [ctr_customer_sk,ctr_store_sk,ctr_total_return]
CometHashAggregate [ctr_customer_sk,ctr_store_sk,ctr_total_return,sr_customer_sk,sr_store_sk,sum,sum(UnscaledValue(sr_return_amt))]
CometHashAggregate [sum] [ctr_customer_sk,ctr_store_sk,ctr_total_return,sr_customer_sk,sr_store_sk,sum(UnscaledValue(sr_return_amt))]
CometExchange [sr_customer_sk,sr_store_sk] #1
CometHashAggregate [sr_customer_sk,sr_store_sk,sum,sr_return_amt]
CometHashAggregate [sr_return_amt] [sr_customer_sk,sr_store_sk,sum]
CometProject [sr_customer_sk,sr_store_sk,sr_return_amt]
CometBroadcastHashJoin [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk,d_date_sk]
CometFilter [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk]
CometNativeScan: `spark_catalog`.`default`.`store_returns` [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk]
CometNativeScan parquet spark_catalog.default.store_returns [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk]
CometBroadcastExchange [d_date_sk] #2
CometProject [d_date_sk]
CometFilter [d_date_sk,d_year]
CometNativeScan: `spark_catalog`.`default`.`date_dim` [d_date_sk,d_year]
CometNativeScan parquet spark_catalog.default.date_dim [d_date_sk,d_year]
CometBroadcastExchange [(avg(ctr_total_return) * 1.2),ctr_store_sk] #3
CometFilter [(avg(ctr_total_return) * 1.2),ctr_store_sk]
CometHashAggregate [(avg(ctr_total_return) * 1.2),ctr_store_sk,sum,count,avg(ctr_total_return)]
CometHashAggregate [sum,count] [(avg(ctr_total_return) * 1.2),ctr_store_sk,avg(ctr_total_return)]
CometExchange [ctr_store_sk] #4
CometHashAggregate [ctr_store_sk,sum,count,ctr_total_return]
CometHashAggregate [ctr_store_sk,ctr_total_return,sr_customer_sk,sr_store_sk,sum,sum(UnscaledValue(sr_return_amt))]
CometHashAggregate [ctr_total_return] [ctr_store_sk,sum,count]
CometHashAggregate [sr_customer_sk,sum] [ctr_store_sk,ctr_total_return,sr_store_sk,sum(UnscaledValue(sr_return_amt))]
CometExchange [sr_customer_sk,sr_store_sk] #5
CometHashAggregate [sr_customer_sk,sr_store_sk,sum,sr_return_amt]
CometHashAggregate [sr_return_amt] [sr_customer_sk,sr_store_sk,sum]
CometProject [sr_customer_sk,sr_store_sk,sr_return_amt]
CometBroadcastHashJoin [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk,d_date_sk]
CometFilter [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk]
CometNativeScan: `spark_catalog`.`default`.`store_returns` [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk]
CometNativeScan parquet spark_catalog.default.store_returns [sr_customer_sk,sr_store_sk,sr_return_amt,sr_returned_date_sk]
ReusedExchange [d_date_sk] #2
CometBroadcastExchange [s_store_sk] #6
CometProject [s_store_sk]
CometFilter [s_store_sk,s_state]
CometNativeScan: `spark_catalog`.`default`.`store` [s_store_sk,s_state]
CometNativeScan parquet spark_catalog.default.store [s_store_sk,s_state]
CometBroadcastExchange [c_customer_sk,c_customer_id] #7
CometFilter [c_customer_sk,c_customer_id]
CometNativeScan: `spark_catalog`.`default`.`customer` [c_customer_sk,c_customer_id]
CometProject [c_customer_id] [c_customer_sk,c_customer_id]
CometFilter [c_customer_sk,c_customer_id]
CometNativeScan parquet spark_catalog.default.customer [c_customer_sk,c_customer_id]
Loading
Loading