Skip to content

Commit 8bda6d9

Browse files
authored
Extract only this Spark's partition's PartitionedFiles in planner.rs, and always execute partition 0. (#2675)
1 parent 796d445 commit 8bda6d9

File tree

4 files changed

+8
-24
lines changed

4 files changed

+8
-24
lines changed

native/core/src/execution/jni_api.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,12 +485,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
485485
}
486486

487487
let task_ctx = exec_context.session_ctx.task_ctx();
488+
// Each Comet native execution corresponds to a single Spark partition,
489+
// so we should always execute partition 0.
488490
let stream = exec_context
489491
.root_op
490492
.as_ref()
491493
.unwrap()
492494
.native_plan
493-
.execute(partition as usize, task_ctx)?;
495+
.execute(0, task_ctx)?;
494496
exec_context.stream = Some(stream);
495497
} else {
496498
// Pull input batches

native/core/src/execution/planner.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,17 +1317,11 @@ impl PhysicalPlanner {
13171317
&object_store_options,
13181318
)?;
13191319

1320-
// Generate file groups
1321-
let mut file_groups: Vec<Vec<PartitionedFile>> =
1322-
Vec::with_capacity(partition_count);
1323-
scan.file_partitions.iter().try_for_each(|partition| {
1324-
let files = self.get_partitioned_files(partition)?;
1325-
file_groups.push(files);
1326-
Ok::<(), ExecutionError>(())
1327-
})?;
1328-
1329-
// TODO: I think we can remove partition_count in the future, but leave for testing.
1330-
assert_eq!(file_groups.len(), partition_count);
1320+
// Comet serializes all partitions' PartitionedFiles, but we only want to read this
1321+
// Spark partition's PartitionedFiles
1322+
let files =
1323+
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
1324+
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
13311325
let partition_fields: Vec<Field> = partition_schema
13321326
.fields()
13331327
.iter()

spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,6 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
200200
}
201201

202202
test("join") {
203-
// TODO enable native_datafusion tests
204-
// https://github.com/apache/datafusion-comet/issues/2660
205-
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
206-
207203
val df = spark.read.parquet(filename)
208204
df.createOrReplaceTempView("t1")
209205
df.createOrReplaceTempView("t2")

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@ class CometJoinSuite extends CometTestBase {
7474
}
7575

7676
test("Broadcast HashJoin without join filter") {
77-
// TODO enable native_datafusion tests
78-
// https://github.com/apache/datafusion-comet/issues/2660
79-
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
80-
8177
withSQLConf(
8278
CometConf.COMET_BATCH_SIZE.key -> "100",
8379
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
@@ -105,10 +101,6 @@ class CometJoinSuite extends CometTestBase {
105101
}
106102

107103
test("Broadcast HashJoin with join filter") {
108-
// TODO enable native_datafusion tests
109-
// https://github.com/apache/datafusion-comet/issues/2660
110-
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
111-
112104
withSQLConf(
113105
CometConf.COMET_BATCH_SIZE.key -> "100",
114106
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",

0 commit comments

Comments
 (0)