From cabf3777036da99a6aebb2b0debd11d4b84e1652 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 12 Dec 2025 13:00:03 -0500 Subject: [PATCH 1/2] Enable bucket pruning with native_datafusion by using EmptyExec when no files for this partition. --- native/core/src/execution/planner.rs | 21 ++++++++++---- .../serde/operator/CometNativeScan.scala | 5 ---- .../apache/comet/exec/CometExecSuite.scala | 28 ++++++++++++++++--- 3 files changed, 40 insertions(+), 14 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 570cb6c47b..712bedc2df 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -59,6 +59,7 @@ use datafusion::{ }, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, + empty::EmptyExec, joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec}, limit::LocalLimitExec, projection::ProjectionExec, @@ -1047,6 +1048,17 @@ impl PhysicalPlanner { .map(|offset| *offset as usize) .collect(); + // Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions) + let partition_files = &scan.file_partitions[self.partition as usize]; + + if partition_files.partitioned_file.is_empty() { + let empty_exec = Arc::new(EmptyExec::new(required_schema)); + return Ok(( + vec![], + Arc::new(SparkPlan::new(spark_plan.plan_id, empty_exec, vec![])), + )); + } + // Convert the Spark expressions to Physical expressions let data_filters: Result>, ExecutionError> = scan .data_filters @@ -1090,13 +1102,12 @@ impl PhysicalPlanner { None }; - // Get one file from the list of files - let one_file = scan - .file_partitions + // Get one file from this partition (we know it's not empty due to early return above) + let one_file = partition_files + .partitioned_file .first() - .and_then(|f| f.partitioned_file.first()) .map(|f| f.file_path.clone()) - .ok_or(GeneralError("Failed to locate file".to_string()))?; + .expect("partition should have files after empty check"); let object_store_options: HashMap = scan .object_store_options diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index b679d9bf4f..6381d1333f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -79,11 +79,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled") } - if (scanExec.bucketedScan) { - // https://github.com/apache/datafusion-comet/issues/1719 - withInfo(scanExec, "Full native scan disabled because bucketed scan is not supported") - } - // the scan is supported if no fallback reasons were added to the node !hasExplainInfo(scanExec) } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9f9df73a91..1b2373ad71 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1581,10 +1581,6 @@ class CometExecSuite extends CometTestBase { } test("bucketed table") { - // native_datafusion actually passes this test, but in the case where buckets are pruned it fails, so we're - // falling back for bucketed scans entirely as a workaround. - // https://github.com/apache/datafusion-comet/issues/1719 - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) @@ -1595,6 +1591,30 @@ class CometExecSuite extends CometTestBase { joinCondition = joinCondition(Seq("i", "j"))) } + test("bucketed table with bucket pruning") { + withSQLConf( + SQLConf.BUCKETING_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + val df1 = (0 until 100).map(i => (i, i % 13, s"left_$i")).toDF("id", "bucket_col", "value") + val df2 = (0 until 100).map(i => (i, i % 13, s"right_$i")).toDF("id", "bucket_col", "value") + val bucketSpec = Some(BucketSpec(8, Seq("bucket_col"), Nil)) + + withTable("bucketed_table1", "bucketed_table2") { + withBucket(df1.write.format("parquet"), bucketSpec).saveAsTable("bucketed_table1") + withBucket(df2.write.format("parquet"), bucketSpec).saveAsTable("bucketed_table2") + + // Join two bucketed tables, but filter one side to trigger bucket pruning + // Only buckets where hash(bucket_col) % 8 matches hash(1) % 8 will be read from left side + val left = spark.table("bucketed_table1").filter("bucket_col = 1") + val right = spark.table("bucketed_table2") + + val result = left.join(right, Seq("bucket_col"), "inner") + + checkSparkAnswerAndOperator(result) + } + } + } + def withBucket( writer: DataFrameWriter[Row], bucketSpec: Option[BucketSpec]): DataFrameWriter[Row] = { From 17876f7cf8d77183846215b0cb422a14f3aab7e0 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 12 Dec 2025 14:18:56 -0500 Subject: [PATCH 2/2] Update parquet_scans.md --- docs/source/contributor-guide/parquet_scans.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 3a2a78ad27..8c2fbfd8f6 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -52,7 +52,6 @@ The `native_datafusion` and `native_iceberg_compat` scans share the following li The `native_datafusion` scan has some additional limitations: -- Bucketed scans are not supported - No support for row indexes - `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] - There are failures in the Spark SQL test suite [#1545]