Skip to content

Commit ef3e33c

Browse files
authored
Generalize functions for NetworkCoalesceExec creation (#162)
1 parent 0ce58c3 commit ef3e33c

File tree

2 files changed

+5
-19
lines changed

2 files changed

+5
-19
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl DistributedPhysicalOptimizerRule {
160160
{
161161
return Ok(Transformed::no(plan));
162162
}
163-
let node = NetworkCoalesceExec::from_coalesce_partitions_exec(node, tasks)?;
163+
let node = NetworkCoalesceExec::from_input_exec(node, tasks)?;
164164

165165
let plan = plan.with_new_children(vec![Arc::new(node)])?;
166166

@@ -174,7 +174,7 @@ impl DistributedPhysicalOptimizerRule {
174174
plan.as_any().downcast_ref::<SortPreservingMergeExec>(),
175175
self.network_coalesce_tasks,
176176
) {
177-
let node = NetworkCoalesceExec::from_sort_preserving_merge_exec(node, tasks)?;
177+
let node = NetworkCoalesceExec::from_input_exec(node, tasks)?;
178178

179179
let plan = plan.with_new_children(vec![Arc::new(node)])?;
180180

src/execution_plans/network_coalesce.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use dashmap::DashMap;
1616
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
1717
use datafusion::error::DataFusionError;
1818
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
19-
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
20-
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
2119
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2220
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
2321
use futures::{TryFutureExt, TryStreamExt};
@@ -101,21 +99,9 @@ pub struct NetworkCoalesceReady {
10199
}
102100

103101
impl NetworkCoalesceExec {
104-
pub fn from_coalesce_partitions_exec(
105-
input: &CoalescePartitionsExec,
106-
input_tasks: usize,
107-
) -> Result<Self, DataFusionError> {
108-
Self::from_input(input, input_tasks)
109-
}
110-
111-
pub fn from_sort_preserving_merge_exec(
112-
input: &SortPreservingMergeExec,
113-
input_tasks: usize,
114-
) -> Result<Self, DataFusionError> {
115-
Self::from_input(input, input_tasks)
116-
}
117-
118-
pub fn from_input(
102+
/// Creates a new [NetworkCoalesceExec] node from a [CoalescePartitionsExec] and
103+
/// [SortPreservingMergeExec].
104+
pub fn from_input_exec(
119105
input: &dyn ExecutionPlan,
120106
input_tasks: usize,
121107
) -> Result<Self, DataFusionError> {

0 commit comments

Comments
 (0)