Skip to content

Commit 2c307b7

Browse files
authored
chore: remove CopyExec (#2663)
1 parent b646be0 commit 2c307b7

File tree

5 files changed

+28
-263
lines changed

5 files changed

+28
-263
lines changed

native/core/src/execution/operators/copy.rs

Lines changed: 3 additions & 217 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,11 @@
1616
// under the License.
1717

1818
use arrow::compute::{cast_with_options, CastOptions};
19-
use futures::{Stream, StreamExt};
20-
use std::{
21-
any::Any,
22-
pin::Pin,
23-
sync::Arc,
24-
task::{Context, Poll},
25-
};
19+
use std::sync::Arc;
2620

27-
use arrow::array::{
28-
downcast_dictionary_array, make_array, Array, ArrayRef, MutableArrayData, RecordBatch,
29-
RecordBatchOptions,
30-
};
31-
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
21+
use arrow::array::{downcast_dictionary_array, make_array, Array, ArrayRef, MutableArrayData};
22+
use arrow::datatypes::DataType;
3223
use arrow::error::ArrowError;
33-
use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
34-
use datafusion::physical_plan::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
35-
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36-
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
37-
38-
/// An utility execution node which makes deep copies of input batches.
39-
///
40-
/// In certain scenarios like sort, DF execution nodes only make shallow copy of input batches.
41-
/// This could cause issues for Comet, since we re-use column vectors across different batches.
42-
/// For those scenarios, this can be used as an adapter node.
43-
#[derive(Debug)]
44-
pub struct CopyExec {
45-
input: Arc<dyn ExecutionPlan>,
46-
schema: SchemaRef,
47-
cache: PlanProperties,
48-
metrics: ExecutionPlanMetricsSet,
49-
mode: CopyMode,
50-
}
5124

5225
#[derive(Debug, PartialEq, Clone)]
5326
pub enum CopyMode {
@@ -57,193 +30,6 @@ pub enum CopyMode {
5730
UnpackOrClone,
5831
}
5932

60-
impl CopyExec {
61-
pub fn new(input: Arc<dyn ExecutionPlan>, mode: CopyMode) -> Self {
62-
// change schema to remove dictionary types because CopyExec always unpacks
63-
// dictionaries
64-
65-
let fields: Vec<Field> = input
66-
.schema()
67-
.fields
68-
.iter()
69-
.map(|f: &FieldRef| match f.data_type() {
70-
DataType::Dictionary(_, value_type) => {
71-
Field::new(f.name(), value_type.as_ref().clone(), f.is_nullable())
72-
}
73-
_ => f.as_ref().clone(),
74-
})
75-
.collect();
76-
77-
let schema = Arc::new(Schema::new(fields));
78-
79-
let cache = PlanProperties::new(
80-
EquivalenceProperties::new(Arc::clone(&schema)),
81-
Partitioning::UnknownPartitioning(1),
82-
EmissionType::Final,
83-
Boundedness::Bounded,
84-
);
85-
86-
Self {
87-
input,
88-
schema,
89-
cache,
90-
metrics: ExecutionPlanMetricsSet::default(),
91-
mode,
92-
}
93-
}
94-
95-
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
96-
&self.input
97-
}
98-
99-
pub fn mode(&self) -> &CopyMode {
100-
&self.mode
101-
}
102-
}
103-
104-
impl DisplayAs for CopyExec {
105-
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
106-
match t {
107-
DisplayFormatType::Default | DisplayFormatType::Verbose => {
108-
write!(f, "CopyExec [{:?}]", self.mode)
109-
}
110-
DisplayFormatType::TreeRender => unimplemented!(),
111-
}
112-
}
113-
}
114-
115-
impl ExecutionPlan for CopyExec {
116-
fn as_any(&self) -> &dyn Any {
117-
self
118-
}
119-
120-
fn schema(&self) -> SchemaRef {
121-
Arc::clone(&self.schema)
122-
}
123-
124-
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
125-
vec![&self.input]
126-
}
127-
128-
fn with_new_children(
129-
self: Arc<Self>,
130-
children: Vec<Arc<dyn ExecutionPlan>>,
131-
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
132-
let input = Arc::clone(&self.input);
133-
let new_input = input.with_new_children(children)?;
134-
Ok(Arc::new(CopyExec {
135-
input: new_input,
136-
schema: Arc::clone(&self.schema),
137-
cache: self.cache.clone(),
138-
metrics: self.metrics.clone(),
139-
mode: self.mode.clone(),
140-
}))
141-
}
142-
143-
fn execute(
144-
&self,
145-
partition: usize,
146-
context: Arc<TaskContext>,
147-
) -> DataFusionResult<SendableRecordBatchStream> {
148-
let child_stream = self.input.execute(partition, context)?;
149-
Ok(Box::pin(CopyStream::new(
150-
self,
151-
self.schema(),
152-
child_stream,
153-
partition,
154-
self.mode.clone(),
155-
)))
156-
}
157-
158-
fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
159-
self.input.partition_statistics(partition)
160-
}
161-
162-
fn properties(&self) -> &PlanProperties {
163-
&self.cache
164-
}
165-
166-
fn name(&self) -> &str {
167-
"CopyExec"
168-
}
169-
170-
fn metrics(&self) -> Option<MetricsSet> {
171-
Some(self.metrics.clone_inner())
172-
}
173-
174-
fn maintains_input_order(&self) -> Vec<bool> {
175-
vec![true; self.children().len()]
176-
}
177-
178-
fn supports_limit_pushdown(&self) -> bool {
179-
true
180-
}
181-
182-
fn cardinality_effect(&self) -> CardinalityEffect {
183-
CardinalityEffect::Equal
184-
}
185-
}
186-
187-
struct CopyStream {
188-
schema: SchemaRef,
189-
child_stream: SendableRecordBatchStream,
190-
baseline_metrics: BaselineMetrics,
191-
mode: CopyMode,
192-
}
193-
194-
impl CopyStream {
195-
fn new(
196-
exec: &CopyExec,
197-
schema: SchemaRef,
198-
child_stream: SendableRecordBatchStream,
199-
partition: usize,
200-
mode: CopyMode,
201-
) -> Self {
202-
Self {
203-
schema,
204-
child_stream,
205-
baseline_metrics: BaselineMetrics::new(&exec.metrics, partition),
206-
mode,
207-
}
208-
}
209-
210-
// TODO: replace copy_or_cast_array with copy_array if upstream sort kernel fixes
211-
// dictionary array sorting issue.
212-
fn copy(&self, batch: RecordBatch) -> DataFusionResult<RecordBatch> {
213-
let mut timer = self.baseline_metrics.elapsed_compute().timer();
214-
let vectors = batch
215-
.columns()
216-
.iter()
217-
.map(|v| copy_or_unpack_array(v, &self.mode))
218-
.collect::<Result<Vec<ArrayRef>, _>>()?;
219-
220-
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
221-
let maybe_batch =
222-
RecordBatch::try_new_with_options(Arc::clone(&self.schema), vectors, &options)
223-
.map_err(|e| arrow_datafusion_err!(e));
224-
timer.stop();
225-
self.baseline_metrics.record_output(batch.num_rows());
226-
maybe_batch
227-
}
228-
}
229-
230-
impl Stream for CopyStream {
231-
type Item = DataFusionResult<RecordBatch>;
232-
233-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
234-
self.child_stream.poll_next_unpin(cx).map(|x| match x {
235-
Some(Ok(batch)) => Some(self.copy(batch)),
236-
other => other,
237-
})
238-
}
239-
}
240-
241-
impl RecordBatchStream for CopyStream {
242-
fn schema(&self) -> SchemaRef {
243-
Arc::clone(&self.schema)
244-
}
245-
}
246-
24733
/// Copy an Arrow Array
24834
pub(crate) fn copy_array(array: &dyn Array) -> ArrayRef {
24935
let capacity = array.len();

native/core/src/execution/planner.rs

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
//! Converts Spark physical plan to DataFusion physical plan
1919
20-
use crate::execution::operators::CopyMode;
2120
use crate::{
2221
errors::ExpressionError,
2322
execution::{
2423
expressions::subquery::Subquery,
25-
operators::{CopyExec, ExecutionError, ExpandExec, ScanExec},
24+
operators::{ExecutionError, ExpandExec, ScanExec},
2625
serde::to_arrow_datatype,
2726
shuffle::ShuffleWriterExec,
2827
},
@@ -1222,15 +1221,13 @@ impl PhysicalPlanner {
12221221
.collect();
12231222

12241223
let fetch = sort.fetch.map(|num| num as usize);
1225-
// SortExec caches batches so we need to make a copy of incoming batches. Also,
1226-
// SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and
1227-
// it would be more efficient if we could avoid that.
1228-
// https://github.com/apache/datafusion-comet/issues/963
1229-
let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan));
12301224

12311225
let mut sort_exec: Arc<dyn ExecutionPlan> = Arc::new(
1232-
SortExec::new(LexOrdering::new(exprs?).unwrap(), Arc::clone(&child_copied))
1233-
.with_fetch(fetch),
1226+
SortExec::new(
1227+
LexOrdering::new(exprs?).unwrap(),
1228+
Arc::clone(&child.native_plan),
1229+
)
1230+
.with_fetch(fetch),
12341231
);
12351232

12361233
if let Some(skip) = sort.skip.filter(|&n| n > 0).map(|n| n as usize) {
@@ -1394,13 +1391,9 @@ impl PhysicalPlanner {
13941391
assert_eq!(children.len(), 1);
13951392
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;
13961393

1397-
// We wrap native shuffle in a CopyExec. This existed previously, but for
1398-
// RangePartitioning at least we want to ensure that dictionaries are unpacked.
1399-
let wrapped_child = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan));
1400-
14011394
let partitioning = self.create_partitioning(
14021395
writer.partitioning.as_ref().unwrap(),
1403-
wrapped_child.schema(),
1396+
child.native_plan.schema(),
14041397
)?;
14051398

14061399
let codec = match writer.codec.try_into() {
@@ -1417,7 +1410,7 @@ impl PhysicalPlanner {
14171410
}?;
14181411

14191412
let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
1420-
wrapped_child,
1413+
Arc::clone(&child.native_plan),
14211414
partitioning,
14221415
codec,
14231416
writer.output_data_file.clone(),
@@ -1507,8 +1500,8 @@ impl PhysicalPlanner {
15071500
})
15081501
.collect();
15091502

1510-
let left = Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan));
1511-
let right = Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan));
1503+
let left = Arc::clone(&join_params.left.native_plan);
1504+
let right = Arc::clone(&join_params.right.native_plan);
15121505

15131506
let join = Arc::new(SortMergeJoinExec::try_new(
15141507
Arc::clone(&left),
@@ -1570,12 +1563,8 @@ impl PhysicalPlanner {
15701563
partition_count,
15711564
)?;
15721565

1573-
// HashJoinExec may cache the input batch internally. We need
1574-
// to copy the input batch to avoid the data corruption from reusing the input
1575-
// batch. We also need to unpack dictionary arrays, because the join operators
1576-
// do not support them.
1577-
let left = Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan));
1578-
let right = Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan));
1566+
let left = Arc::clone(&join_params.left.native_plan);
1567+
let right = Arc::clone(&join_params.right.native_plan);
15791568

15801569
let hash_join = Arc::new(HashJoinExec::try_new(
15811570
left,
@@ -1805,11 +1794,6 @@ impl PhysicalPlanner {
18051794
))
18061795
}
18071796

1808-
/// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays.
1809-
fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
1810-
Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
1811-
}
1812-
18131797
/// Create a DataFusion physical aggregate expression from Spark physical aggregate expression
18141798
fn create_agg_expr(
18151799
&self,

native/core/src/execution/spark_plan.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::execution::operators::CopyExec;
1918
use arrow::datatypes::SchemaRef;
2019
use datafusion::physical_plan::ExecutionPlan;
2120
use std::sync::Arc;
@@ -43,15 +42,11 @@ impl SparkPlan {
4342
native_plan: Arc<dyn ExecutionPlan>,
4443
children: Vec<Arc<SparkPlan>>,
4544
) -> Self {
46-
let mut additional_native_plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
47-
for child in &children {
48-
collect_additional_plans(Arc::clone(&child.native_plan), &mut additional_native_plans);
49-
}
5045
Self {
5146
plan_id,
5247
native_plan,
5348
children,
54-
additional_native_plans,
49+
additional_native_plans: vec![],
5550
}
5651
}
5752

@@ -66,9 +61,6 @@ impl SparkPlan {
6661
for plan in &additional_native_plans {
6762
accum.push(Arc::clone(plan));
6863
}
69-
for child in &children {
70-
collect_additional_plans(Arc::clone(&child.native_plan), &mut accum);
71-
}
7264
Self {
7365
plan_id,
7466
native_plan,
@@ -87,12 +79,3 @@ impl SparkPlan {
8779
&self.children
8880
}
8981
}
90-
91-
fn collect_additional_plans(
92-
child: Arc<dyn ExecutionPlan>,
93-
additional_native_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
94-
) {
95-
if child.as_any().is::<CopyExec>() {
96-
additional_native_plans.push(Arc::clone(&child));
97-
}
98-
}

spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
200200
}
201201

202202
test("join") {
203+
// TODO enable native_datafusion tests
204+
// https://github.com/apache/datafusion-comet/issues/2660
205+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
206+
203207
val df = spark.read.parquet(filename)
204208
df.createOrReplaceTempView("t1")
205209
df.createOrReplaceTempView("t2")

0 commit comments

Comments
 (0)