Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/source/contributor-guide/parquet_scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ The `native_datafusion` and `native_iceberg_compat` scans share the following li

The `native_datafusion` scan has some additional limitations:

- Bucketed scans are not supported
- No support for row indexes
- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758]
- There are failures in the Spark SQL test suite [#1545]
Expand Down
21 changes: 16 additions & 5 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion::{
},
physical_plan::{
aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy},
empty::EmptyExec,
joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec},
limit::LocalLimitExec,
projection::ProjectionExec,
Expand Down Expand Up @@ -1047,6 +1048,17 @@ impl PhysicalPlanner {
.map(|offset| *offset as usize)
.collect();

// Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions)
let partition_files = &scan.file_partitions[self.partition as usize];

if partition_files.partitioned_file.is_empty() {
let empty_exec = Arc::new(EmptyExec::new(required_schema));
return Ok((
vec![],
Arc::new(SparkPlan::new(spark_plan.plan_id, empty_exec, vec![])),
));
}

// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
.data_filters
Expand Down Expand Up @@ -1090,13 +1102,12 @@ impl PhysicalPlanner {
None
};

// Get one file from the list of files
let one_file = scan
.file_partitions
// Get one file from this partition (we know it's not empty due to early return above)
let one_file = partition_files
.partitioned_file
.first()
.and_then(|f| f.partitioned_file.first())
.map(|f| f.file_path.clone())
.ok_or(GeneralError("Failed to locate file".to_string()))?;
.expect("partition should have files after empty check");

let object_store_options: HashMap<String, String> = scan
.object_store_options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled")
}

if (scanExec.bucketedScan) {
// https://github.com/apache/datafusion-comet/issues/1719
withInfo(scanExec, "Full native scan disabled because bucketed scan is not supported")
}

// the scan is supported if no fallback reasons were added to the node
!hasExplainInfo(scanExec)
}
Expand Down
28 changes: 24 additions & 4 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1581,10 +1581,6 @@ class CometExecSuite extends CometTestBase {
}

test("bucketed table") {
// native_datafusion actually passes this test, but in the case where buckets are pruned it fails, so we're
// falling back for bucketed scans entirely as a workaround.
// https://github.com/apache/datafusion-comet/issues/1719
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
Expand All @@ -1595,6 +1591,30 @@ class CometExecSuite extends CometTestBase {
joinCondition = joinCondition(Seq("i", "j")))
}

test("bucketed table with bucket pruning") {
withSQLConf(
SQLConf.BUCKETING_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df1 = (0 until 100).map(i => (i, i % 13, s"left_$i")).toDF("id", "bucket_col", "value")
val df2 = (0 until 100).map(i => (i, i % 13, s"right_$i")).toDF("id", "bucket_col", "value")
val bucketSpec = Some(BucketSpec(8, Seq("bucket_col"), Nil))

withTable("bucketed_table1", "bucketed_table2") {
withBucket(df1.write.format("parquet"), bucketSpec).saveAsTable("bucketed_table1")
withBucket(df2.write.format("parquet"), bucketSpec).saveAsTable("bucketed_table2")

// Join two bucketed tables, but filter one side to trigger bucket pruning
// Only buckets where hash(bucket_col) % 8 matches hash(1) % 8 will be read from left side
val left = spark.table("bucketed_table1").filter("bucket_col = 1")
val right = spark.table("bucketed_table2")

val result = left.join(right, Seq("bucket_col"), "inner")

checkSparkAnswerAndOperator(result)
}
}
}

def withBucket(
writer: DataFrameWriter[Row],
bucketSpec: Option[BucketSpec]): DataFrameWriter[Row] = {
Expand Down
Loading