Skip to content

Commit fbfc512

Browse files
authored
Distributed planning overhaul (#145)
* Refactor ArrowFlightReadExec * Refactor do_get.rs * Introduce new NetworkHashShuffleExec and NetworkCoalesceTasksExec * Refactor to NetworkShuffleExec and NetworkCoalesceExec * Add docs for the new ExecutionPlan implementations * Increase max_decoding_message_size * Allow specifying only 1 task for network boundaries, and collapsing it single-node * Bring back graphviz visualization * Fix remaining tests * Rollback hardcoded client decoded size * Rename ArrowFlightReadExec to NetworkShuffleExec * Collapse nested if let Some() = ... blocks in distributed_physical_optimizer_rule.rs * Better display of partitions in network shuffle tasks * Add some tests to partition_isolator.rs * Update comments in benchmark arguments
1 parent e428e7f commit fbfc512

28 files changed

+2644
-2096
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::util::{
2525
};
2626
use async_trait::async_trait;
2727
use datafusion::arrow::record_batch::RecordBatch;
28-
use datafusion::arrow::util::pretty::{self, pretty_format_batches};
28+
use datafusion::arrow::util::pretty::pretty_format_batches;
2929
use datafusion::common::instant::Instant;
3030
use datafusion::common::tree_node::{Transformed, TreeNode};
3131
use datafusion::common::utils::get_available_parallelism;
@@ -101,9 +101,27 @@ pub struct RunOpt {
101101
#[structopt(short = "t", long = "sorted")]
102102
sorted: bool,
103103

104-
/// Number of partitions per task.
105-
#[structopt(long = "ppt")]
106-
partitions_per_task: Option<usize>,
104+
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
105+
/// ```text
106+
/// ( task 1 ) ( task 2 ) ( task 3 )
107+
/// ▲ ▲ ▲
108+
/// └────┬──────┴─────┬────┘
109+
/// ( task 1 ) ( task 2 ) N tasks
110+
/// ```
111+
/// This parameter defines N
112+
#[structopt(long)]
113+
shuffle_tasks: Option<usize>,
114+
115+
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
116+
/// ```text
117+
/// ( task 1 )
118+
/// ▲
119+
/// ┌───────────┴──────────┐
120+
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
121+
/// ```
122+
/// This parameter defines N
123+
#[structopt(long)]
124+
coalesce_tasks: Option<usize>,
107125

108126
/// Spawns a worker in the specified port.
109127
#[structopt(long)]
@@ -141,10 +159,9 @@ impl DistributedSessionBuilder for RunOpt {
141159
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
142160
}
143161
if !self.workers.is_empty() {
144-
let mut rule = DistributedPhysicalOptimizerRule::new();
145-
if let Some(partitions_per_task) = self.partitions_per_task {
146-
rule = rule.with_maximum_partitions_per_task(partitions_per_task)
147-
}
162+
let rule = DistributedPhysicalOptimizerRule::new()
163+
.with_network_coalesce_tasks(self.coalesce_tasks.unwrap_or(self.workers.len()))
164+
.with_network_shuffle_tasks(self.shuffle_tasks.unwrap_or(self.workers.len()));
148165
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
149166
}
150167

@@ -325,11 +342,6 @@ impl RunOpt {
325342
"=== Physical plan with metrics ===\n{}\n",
326343
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
327344
);
328-
if !result.is_empty() {
329-
// do not call print_batches if there are no batches as the result is confusing
330-
// and makes it look like there is a batch with no columns
331-
pretty::print_batches(&result)?;
332-
}
333345
}
334346
Ok((result, n_tasks))
335347
}

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod composed_extension_codec;
2+
mod partitioning;
23
#[allow(unused)]
34
pub mod ttl_map;
45

56
pub(crate) use composed_extension_codec::ComposedPhysicalExtensionCodec;
7+
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};

src/common/partitioning.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use datafusion::physical_expr::Partitioning;
2+
use datafusion::physical_plan::PlanProperties;
3+
4+
pub fn scale_partitioning_props(
5+
props: &PlanProperties,
6+
f: impl FnOnce(usize) -> usize,
7+
) -> PlanProperties {
8+
PlanProperties::new(
9+
props.eq_properties.clone(),
10+
scale_partitioning(&props.partitioning, f),
11+
props.emission_type,
12+
props.boundedness,
13+
)
14+
}
15+
16+
pub fn scale_partitioning(
17+
partitioning: &Partitioning,
18+
f: impl FnOnce(usize) -> usize,
19+
) -> Partitioning {
20+
match &partitioning {
21+
Partitioning::RoundRobinBatch(p) => Partitioning::RoundRobinBatch(f(*p)),
22+
Partitioning::Hash(hash, p) => Partitioning::Hash(hash.clone(), f(*p)),
23+
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
24+
}
25+
}

src/config_extension_ext.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use datafusion::common::{internal_datafusion_err, DataFusionError};
22
use datafusion::config::ConfigExtension;
3+
use datafusion::execution::TaskContext;
34
use datafusion::prelude::SessionConfig;
45
use http::{HeaderMap, HeaderName};
56
use std::error::Error;
@@ -79,6 +80,15 @@ impl ContextGrpcMetadata {
7980
}
8081
self
8182
}
83+
84+
pub fn headers_from_ctx(ctx: &Arc<TaskContext>) -> HeaderMap {
85+
ctx.session_config()
86+
.get_extension::<ContextGrpcMetadata>()
87+
.as_ref()
88+
.map(|v| v.as_ref().clone())
89+
.unwrap_or_default()
90+
.0
91+
}
8292
}
8393

8494
#[cfg(test)]

0 commit comments

Comments
 (0)