Skip to content

Commit 6fd2f9f

Browse files
authored
Add CopyExec to inputs to SortMergeJoinExec (#2155)
1 parent 441e5e7 commit 6fd2f9f

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

native/core/src/execution/planner.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1603,9 +1603,12 @@ impl PhysicalPlanner {
16031603
})
16041604
.collect();
16051605

1606+
let left = Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan));
1607+
let right = Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan));
1608+
16061609
let join = Arc::new(SortMergeJoinExec::try_new(
1607-
Arc::clone(&join_params.left.native_plan),
1608-
Arc::clone(&join_params.right.native_plan),
1610+
Arc::clone(&left),
1611+
Arc::clone(&right),
16091612
join_params.join_on,
16101613
join_params.join_filter,
16111614
join_params.join_type,
@@ -2611,8 +2614,10 @@ impl From<ExpressionError> for DataFusionError {
26112614
/// data corruption from reusing the input batch.
26122615
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
26132616
if op.as_any().is::<ScanExec>() {
2614-
// JVM side can return arrow buffers to the pool
2615-
// Also, native_comet scan reuses mutable buffers
2617+
// native_comet and native_iceberg_compat scan reuse mutable buffers
2618+
// so we need to make copies of the batches
2619+
// for now, we also copy even if the source is not a Parquet scan, but
2620+
// we will optimize this later
26162621
true
26172622
} else if op.as_any().is::<CopyExec>() {
26182623
let copy_exec = op.as_any().downcast_ref::<CopyExec>().unwrap();

0 commit comments

Comments
 (0)