Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::util::{
};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::{self, pretty_format_batches};
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::common::instant::Instant;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::utils::get_available_parallelism;
Expand Down Expand Up @@ -102,8 +102,12 @@ pub struct RunOpt {
sorted: bool,

/// Number of partitions per task.
#[structopt(long = "ppt")]
partitions_per_task: Option<usize>,
#[structopt(long)]
shuffle_tasks: Option<usize>,

/// Number of partitions per task.
#[structopt(long)]
coalesce_tasks: Option<usize>,

/// Spawns a worker in the specified port.
#[structopt(long)]
Expand Down Expand Up @@ -141,10 +145,9 @@ impl DistributedSessionBuilder for RunOpt {
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
}
if !self.workers.is_empty() {
let mut rule = DistributedPhysicalOptimizerRule::new();
if let Some(partitions_per_task) = self.partitions_per_task {
rule = rule.with_maximum_partitions_per_task(partitions_per_task)
}
let rule = DistributedPhysicalOptimizerRule::new()
.with_network_coalesce_tasks(self.coalesce_tasks.unwrap_or(self.workers.len()))
.with_network_shuffle_tasks(self.shuffle_tasks.unwrap_or(self.workers.len()));
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
}

Expand Down Expand Up @@ -325,11 +328,6 @@ impl RunOpt {
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
);
if !result.is_empty() {
// do not call print_batches if there are no batches as the result is confusing
// and makes it look like there is a batch with no columns
pretty::print_batches(&result)?;
}
}
Ok((result, n_tasks))
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod composed_extension_codec;
mod partitioning;
#[allow(unused)]
pub mod ttl_map;

pub(crate) use composed_extension_codec::ComposedPhysicalExtensionCodec;
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};
25 changes: 25 additions & 0 deletions src/common/partitioning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::PlanProperties;

pub fn scale_partitioning_props(
props: &PlanProperties,
f: impl FnOnce(usize) -> usize,
) -> PlanProperties {
PlanProperties::new(
props.eq_properties.clone(),
scale_partitioning(&props.partitioning, f),
props.emission_type,
props.boundedness,
)
}

pub fn scale_partitioning(
partitioning: &Partitioning,
f: impl FnOnce(usize) -> usize,
) -> Partitioning {
match &partitioning {
Partitioning::RoundRobinBatch(p) => Partitioning::RoundRobinBatch(f(*p)),
Partitioning::Hash(hash, p) => Partitioning::Hash(hash.clone(), f(*p)),
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
}
}
10 changes: 10 additions & 0 deletions src/config_extension_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use datafusion::common::{internal_datafusion_err, DataFusionError};
use datafusion::config::ConfigExtension;
use datafusion::execution::TaskContext;
use datafusion::prelude::SessionConfig;
use http::{HeaderMap, HeaderName};
use std::error::Error;
Expand Down Expand Up @@ -79,6 +80,15 @@ impl ContextGrpcMetadata {
}
self
}

pub fn headers_from_ctx(ctx: &Arc<TaskContext>) -> HeaderMap {
ctx.session_config()
.get_extension::<ContextGrpcMetadata>()
.as_ref()
.map(|v| v.as_ref().clone())
.unwrap_or_default()
.0
}
}

#[cfg(test)]
Expand Down
Loading