Skip to content

Commit 7282d57

Browse files
authored
feat: Enable bucket pruning with native_datafusion scans (#2888)
1 parent 6d8b8c5 commit 7282d57

File tree

4 files changed

+40
-15
lines changed

4 files changed

+40
-15
lines changed

docs/source/contributor-guide/parquet_scans.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ The `native_datafusion` and `native_iceberg_compat` scans share the following li
5252

5353
The `native_datafusion` scan has some additional limitations:
5454

55-
- Bucketed scans are not supported
5655
- No support for row indexes
5756
- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758]
5857
- There are failures in the Spark SQL test suite [#1545]

native/core/src/execution/planner.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use datafusion::{
5959
},
6060
physical_plan::{
6161
aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy},
62+
empty::EmptyExec,
6263
joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec},
6364
limit::LocalLimitExec,
6465
projection::ProjectionExec,
@@ -1047,6 +1048,17 @@ impl PhysicalPlanner {
10471048
.map(|offset| *offset as usize)
10481049
.collect();
10491050

1051+
// Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions)
1052+
let partition_files = &scan.file_partitions[self.partition as usize];
1053+
1054+
if partition_files.partitioned_file.is_empty() {
1055+
let empty_exec = Arc::new(EmptyExec::new(required_schema));
1056+
return Ok((
1057+
vec![],
1058+
Arc::new(SparkPlan::new(spark_plan.plan_id, empty_exec, vec![])),
1059+
));
1060+
}
1061+
10501062
// Convert the Spark expressions to Physical expressions
10511063
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
10521064
.data_filters
@@ -1090,13 +1102,12 @@ impl PhysicalPlanner {
10901102
None
10911103
};
10921104

1093-
// Get one file from the list of files
1094-
let one_file = scan
1095-
.file_partitions
1105+
// Get one file from this partition (we know it's not empty due to early return above)
1106+
let one_file = partition_files
1107+
.partitioned_file
10961108
.first()
1097-
.and_then(|f| f.partitioned_file.first())
10981109
.map(|f| f.file_path.clone())
1099-
.ok_or(GeneralError("Failed to locate file".to_string()))?;
1110+
.expect("partition should have files after empty check");
11001111

11011112
let object_store_options: HashMap<String, String> = scan
11021113
.object_store_options

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
7979
withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled")
8080
}
8181

82-
if (scanExec.bucketedScan) {
83-
// https://github.com/apache/datafusion-comet/issues/1719
84-
withInfo(scanExec, "Full native scan disabled because bucketed scan is not supported")
85-
}
86-
8782
// the scan is supported if no fallback reasons were added to the node
8883
!hasExplainInfo(scanExec)
8984
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,10 +1581,6 @@ class CometExecSuite extends CometTestBase {
15811581
}
15821582

15831583
test("bucketed table") {
1584-
// native_datafusion actually passes this test, but in the case where buckets are pruned it fails, so we're
1585-
// falling back for bucketed scans entirely as a workaround.
1586-
// https://github.com/apache/datafusion-comet/issues/1719
1587-
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
15881584
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
15891585
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
15901586
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
@@ -1595,6 +1591,30 @@ class CometExecSuite extends CometTestBase {
15951591
joinCondition = joinCondition(Seq("i", "j")))
15961592
}
15971593

1594+
test("bucketed table with bucket pruning") {
1595+
withSQLConf(
1596+
SQLConf.BUCKETING_ENABLED.key -> "true",
1597+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
1598+
val df1 = (0 until 100).map(i => (i, i % 13, s"left_$i")).toDF("id", "bucket_col", "value")
1599+
val df2 = (0 until 100).map(i => (i, i % 13, s"right_$i")).toDF("id", "bucket_col", "value")
1600+
val bucketSpec = Some(BucketSpec(8, Seq("bucket_col"), Nil))
1601+
1602+
withTable("bucketed_table1", "bucketed_table2") {
1603+
withBucket(df1.write.format("parquet"), bucketSpec).saveAsTable("bucketed_table1")
1604+
withBucket(df2.write.format("parquet"), bucketSpec).saveAsTable("bucketed_table2")
1605+
1606+
// Join two bucketed tables, but filter one side to trigger bucket pruning
1607+
// Only buckets where hash(bucket_col) % 8 matches hash(1) % 8 will be read from left side
1608+
val left = spark.table("bucketed_table1").filter("bucket_col = 1")
1609+
val right = spark.table("bucketed_table2")
1610+
1611+
val result = left.join(right, Seq("bucket_col"), "inner")
1612+
1613+
checkSparkAnswerAndOperator(result)
1614+
}
1615+
}
1616+
}
1617+
15981618
def withBucket(
15991619
writer: DataFrameWriter[Row],
16001620
bucketSpec: Option[BucketSpec]): DataFrameWriter[Row] = {

0 commit comments

Comments
 (0)