Skip to content

Commit c429bea

Browse files
authored
fix: Improve logic for determining when an UnpackOrDeepCopy is needed (#2142)
1 parent f8ed109 commit c429bea

File tree

2 files changed

+24
-3
lines changed

2 files changed

+24
-3
lines changed

native/core/src/execution/operators/copy.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ impl CopyExec {
9191
mode,
9292
}
9393
}
94+
95+
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
96+
&self.input
97+
}
98+
99+
pub fn mode(&self) -> &CopyMode {
100+
&self.mode
101+
}
94102
}
95103

96104
impl DisplayAs for CopyExec {

native/core/src/execution/planner.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,10 +2610,23 @@ impl From<ExpressionError> for DataFusionError {
26102610
/// modification. This is used to determine if we need to copy the input batch to avoid
26112611
/// data corruption from reusing the input batch.
26122612
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
2613-
if op.as_any().is::<ProjectionExec>() || op.as_any().is::<LocalLimitExec>() {
2614-
can_reuse_input_batch(op.children()[0])
2613+
if op.as_any().is::<ScanExec>() {
2614+
// JVM side can return arrow buffers to the pool
2615+
// Also, native_comet scan reuses mutable buffers
2616+
true
2617+
} else if op.as_any().is::<CopyExec>() {
2618+
let copy_exec = op.as_any().downcast_ref::<CopyExec>().unwrap();
2619+
copy_exec.mode() == &CopyMode::UnpackOrClone && can_reuse_input_batch(copy_exec.input())
2620+
} else if op.as_any().is::<CometFilterExec>() {
2621+
// CometFilterExec guarantees that all arrays have been copied
2622+
false
26152623
} else {
2616-
op.as_any().is::<ScanExec>()
2624+
for child in op.children() {
2625+
if can_reuse_input_batch(child) {
2626+
return true;
2627+
}
2628+
}
2629+
false
26172630
}
26182631
}
26192632

0 commit comments

Comments
 (0)