Skip to content

Commit 22d6204

Browse files
authored
perf: Avoid FFI copy in ScanExec when reading data from exchanges (#2268)
1 parent 60776f2 commit 22d6204

File tree

4 files changed

+57
-8
lines changed

4 files changed

+57
-8
lines changed

native/core/src/execution/operators/scan.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ pub struct ScanExec {
8181
jvm_fetch_time: Time,
8282
/// Time spent in FFI
8383
arrow_ffi_time: Time,
84+
/// Whether native code can assume ownership of batches that it receives
85+
arrow_ffi_safe: bool,
8486
}
8587

8688
impl ScanExec {
@@ -89,6 +91,7 @@ impl ScanExec {
8991
input_source: Option<Arc<GlobalRef>>,
9092
input_source_description: &str,
9193
data_types: Vec<DataType>,
94+
arrow_ffi_safe: bool,
9295
) -> Result<Self, CometError> {
9396
let metrics_set = ExecutionPlanMetricsSet::default();
9497
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
@@ -109,6 +112,7 @@ impl ScanExec {
109112
data_types.len(),
110113
&jvm_fetch_time,
111114
&arrow_ffi_time,
115+
arrow_ffi_safe,
112116
)?;
113117
timer.stop();
114118
batch
@@ -139,6 +143,7 @@ impl ScanExec {
139143
jvm_fetch_time,
140144
arrow_ffi_time,
141145
schema,
146+
arrow_ffi_safe,
142147
})
143148
}
144149

@@ -187,6 +192,7 @@ impl ScanExec {
187192
self.data_types.len(),
188193
&self.jvm_fetch_time,
189194
&self.arrow_ffi_time,
195+
self.arrow_ffi_safe,
190196
)?;
191197
*current_batch = Some(next_batch);
192198
}
@@ -203,6 +209,7 @@ impl ScanExec {
203209
num_cols: usize,
204210
jvm_fetch_time: &Time,
205211
arrow_ffi_time: &Time,
212+
arrow_ffi_safe: bool,
206213
) -> Result<InputBatch, CometError> {
207214
if exec_context_id == TEST_EXEC_CONTEXT_ID {
208215
// This is a unit test. We don't need to call JNI.
@@ -281,10 +288,14 @@ impl ScanExec {
281288

282289
let array = make_array(array_data);
283290

284-
// we copy the array to that we don't have to worry about potential memory
285-
// corruption issues later on if underlying buffers are reused or freed
286-
// TODO optimize this so that we only do this for Parquet inputs!
287-
let array = copy_array(&array);
291+
let array = if arrow_ffi_safe {
292+
// ownership of this array has been transferred to native
293+
array
294+
} else {
295+
// it is necessary to copy the array because the contents may be
296+
// overwritten on the JVM side in the future
297+
copy_array(&array)
298+
};
288299

289300
inputs.push(array);
290301

native/core/src/execution/planner.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,8 +1468,13 @@ impl PhysicalPlanner {
14681468
};
14691469

14701470
// The `ScanExec` operator will take actual arrays from Spark during execution
1471-
let scan =
1472-
ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?;
1471+
let scan = ScanExec::new(
1472+
self.exec_context_id,
1473+
input_source,
1474+
&scan.source,
1475+
data_types,
1476+
scan.arrow_ffi_safe,
1477+
)?;
14731478

14741479
Ok((
14751480
vec![scan.clone()],
@@ -2838,6 +2843,7 @@ mod tests {
28382843
type_info: None,
28392844
}],
28402845
source: "".to_string(),
2846+
arrow_ffi_safe: false,
28412847
})),
28422848
};
28432849

@@ -2911,6 +2917,7 @@ mod tests {
29112917
type_info: None,
29122918
}],
29132919
source: "".to_string(),
2920+
arrow_ffi_safe: false,
29142921
})),
29152922
};
29162923

@@ -3121,6 +3128,7 @@ mod tests {
31213128
op_struct: Some(OpStruct::Scan(spark_operator::Scan {
31223129
fields: vec![create_proto_datatype()],
31233130
source: "".to_string(),
3131+
arrow_ffi_safe: false,
31243132
})),
31253133
}
31263134
}
@@ -3163,6 +3171,7 @@ mod tests {
31633171
},
31643172
],
31653173
source: "".to_string(),
3174+
arrow_ffi_safe: false,
31663175
})),
31673176
};
31683177

@@ -3277,6 +3286,7 @@ mod tests {
32773286
},
32783287
],
32793288
source: "".to_string(),
3289+
arrow_ffi_safe: false,
32803290
})),
32813291
};
32823292

native/proto/src/proto/operator.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ message Scan {
7777
// is purely for informational purposes when viewing native query plans in
7878
// debug mode.
7979
string source = 2;
80+
// Whether native code can assume ownership of batches that it receives
81+
bool arrow_ffi_safe = 3;
8082
}
8183

8284
message NativeScan {

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1838,6 +1838,23 @@ object QueryPlanSerde extends Logging with CometExprShim {
18381838
scanBuilder.setSource(source)
18391839
}
18401840

1841+
val ffiSafe = op match {
1842+
case _ if isExchangeSink(op) =>
1843+
// Source of broadcast exchange batches is ArrowStreamReader
1844+
// Source of shuffle exchange batches is NativeBatchDecoderIterator
1845+
true
1846+
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_COMET =>
1847+
// native_comet scan reuses mutable buffers
1848+
false
1849+
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
1850+
// native_iceberg_compat scan reuses mutable buffers for constant columns
1851+
// https://github.com/apache/datafusion-comet/issues/2152
1852+
false
1853+
case _ =>
1854+
false
1855+
}
1856+
scanBuilder.setArrowFfiSafe(ffiSafe)
1857+
18411858
val scanTypes = op.output.flatten { attr =>
18421859
serializeDataType(attr.dataType)
18431860
}
@@ -1889,22 +1906,31 @@ object QueryPlanSerde extends Logging with CometExprShim {
18891906
* called.
18901907
*/
18911908
private def isCometSink(op: SparkPlan): Boolean = {
1909+
if (isExchangeSink(op)) {
1910+
return true
1911+
}
18921912
op match {
18931913
case s if isCometScan(s) => true
18941914
case _: CometSparkToColumnarExec => true
18951915
case _: CometSinkPlaceHolder => true
18961916
case _: CoalesceExec => true
18971917
case _: CollectLimitExec => true
18981918
case _: UnionExec => true
1919+
case _: TakeOrderedAndProjectExec => true
1920+
case _: WindowExec => true
1921+
case _ => false
1922+
}
1923+
}
1924+
1925+
private def isExchangeSink(op: SparkPlan): Boolean = {
1926+
op match {
18991927
case _: ShuffleExchangeExec => true
19001928
case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
19011929
case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) => true
1902-
case _: TakeOrderedAndProjectExec => true
19031930
case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
19041931
case BroadcastQueryStageExec(_, ReusedExchangeExec(_, _: CometBroadcastExchangeExec), _) =>
19051932
true
19061933
case _: BroadcastExchangeExec => true
1907-
case _: WindowExec => true
19081934
case _ => false
19091935
}
19101936
}

0 commit comments

Comments
 (0)