Skip to content

Commit a99ff6c

Browse files
committed
Allow specifying only 1 task for network boundaries, and collapsing it single-node
1 parent 471c476 commit a99ff6c

File tree

4 files changed

+382
-371
lines changed

4 files changed

+382
-371
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::util::{
2525
};
2626
use async_trait::async_trait;
2727
use datafusion::arrow::record_batch::RecordBatch;
28-
use datafusion::arrow::util::pretty::{self, pretty_format_batches};
28+
use datafusion::arrow::util::pretty::pretty_format_batches;
2929
use datafusion::common::instant::Instant;
3030
use datafusion::common::tree_node::{Transformed, TreeNode};
3131
use datafusion::common::utils::get_available_parallelism;
@@ -103,7 +103,11 @@ pub struct RunOpt {
103103

104104
/// Number of partitions per task.
105105
#[structopt(long)]
106-
max_tasks: Option<usize>,
106+
shuffle_tasks: Option<usize>,
107+
108+
/// Number of partitions per task.
109+
#[structopt(long)]
110+
coalesce_tasks: Option<usize>,
107111

108112
/// Spawns a worker in the specified port.
109113
#[structopt(long)]
@@ -141,10 +145,9 @@ impl DistributedSessionBuilder for RunOpt {
141145
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
142146
}
143147
if !self.workers.is_empty() {
144-
let tasks = self.max_tasks.unwrap_or(self.workers.len());
145148
let rule = DistributedPhysicalOptimizerRule::new()
146-
.with_network_coalesce_tasks(tasks)
147-
.with_network_shuffle_tasks(tasks);
149+
.with_network_coalesce_tasks(self.coalesce_tasks.unwrap_or(self.workers.len()))
150+
.with_network_shuffle_tasks(self.shuffle_tasks.unwrap_or(self.workers.len()));
148151
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
149152
}
150153

@@ -325,11 +328,6 @@ impl RunOpt {
325328
"=== Physical plan with metrics ===\n{}\n",
326329
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
327330
);
328-
if !result.is_empty() {
329-
// do not call print_batches if there are no batches as the result is confusing
330-
// and makes it look like there is a batch with no columns
331-
pretty::print_batches(&result)?;
332-
}
333331
}
334332
Ok((result, n_tasks))
335333
}

src/distributed_physical_optimizer_rule.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,12 @@ impl DistributedPhysicalOptimizerRule {
240240
// 1 task, there's no point in having a network boundary in between, they can just
241241
// communicate in memory.
242242
if n_tasks == 1 && in_tasks == 1 {
243-
return Ok(Transformed::no(dnode.rollback()?));
243+
let mut n = dnode.rollback()?;
244+
if let Some(node) = n.as_any().downcast_ref::<PartitionIsolatorExec>() {
245+
// Also trim PartitionIsolatorExec out of the plan.
246+
n = Arc::clone(node.children().first().unwrap());
247+
}
248+
return Ok(Transformed::yes(n));
244249
}
245250
match Self::_distribute_plan_inner(query_id, inner_plan, num, depth + 1, in_tasks) {
246251
Ok(v) => break v,

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ impl ExecutionPlan for NetworkShuffleExec {
243243

244244
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
245245
match self {
246-
NetworkShuffleExec::Pending(v) => v.repartition_exec.children(),
246+
NetworkShuffleExec::Pending(v) => vec![&v.repartition_exec],
247247
NetworkShuffleExec::Ready(_) => vec![],
248248
}
249249
}

0 commit comments

Comments
 (0)