Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion benchmarks/cdk/bin/datafusion-bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async function main () {
.option('-i, --iterations <number>', 'Number of iterations', '3')
.option('--files-per-task <number>', 'Files per task', '4')
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
.option('--shuffle-batch-size <number>', 'Shuffle batch coalescing size (number of rows)', '8192')
.option('--query <number>', 'A specific query to run', undefined)
.parse(process.argv);

Expand All @@ -24,6 +25,7 @@ async function main () {
const iterations = parseInt(options.iterations);
const filesPerTask = parseInt(options.filesPerTask);
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
const shuffleBatchSize = parseInt(options.shuffleBatchSize);

// Compare with previous results first
const results: BenchmarkResults = { queries: [] };
Expand All @@ -33,7 +35,8 @@ async function main () {
await query(createTablesSql(sf))
await query(`
SET distributed.files_per_task=${filesPerTask};
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf}
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf};
SET distributed.shuffle_batch_size=${shuffleBatchSize}
`)

for (let id of IDS) {
Expand Down
7 changes: 7 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ extensions_options! {
/// - If a node reduces the cardinality of the data, this factor will decrease.
/// - In any other situation, this factor is left intact.
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
/// Upon shuffling over the network, data streams need to be disassembled in a lot of output
/// partitions, which means the resulting streams might contain a lot of tiny record batches
/// to be sent over the wire. This parameter controls the batch size in number of rows for
/// the CoalesceBatchExec operator that is placed at the top of the stage for sending bigger
/// batches over the wire.
/// If set to 0, batch coalescing is disabled on network shuffle operations.
pub shuffle_batch_size: usize, default = 8192
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
Expand Down
125 changes: 72 additions & 53 deletions src/distributed_planner/distributed_physical_optimizer_rule.rs

Large diffs are not rendered by default.

32 changes: 22 additions & 10 deletions src/execution_plans/network_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use bytes::Bytes;
use dashmap::DashMap;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::common::{exec_err, internal_datafusion_err, plan_err};
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -172,18 +173,29 @@ impl NetworkBoundary for NetworkShuffleExec {
return plan_err!("cannot only return wrapped child if on Pending state");
};

// TODO: Avoid downcasting once https://github.com/apache/datafusion/pull/17990 is shipped.
let Some(r_exe) = pending.input.as_any().downcast_ref::<RepartitionExec>() else {
return plan_err!("NetworkShuffleExec.input must always be RepartitionExec");
};

let next_stage_plan = Arc::new(RepartitionExec::try_new(
require_one_child(r_exe.children())?,
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
)?);
let transformed = Arc::clone(&pending.input).transform_down(|plan| {
if let Some(r_exe) = plan.as_any().downcast_ref::<RepartitionExec>() {
// Scale the input RepartitionExec to account for all the tasks to which it will
// need to fan data out.
let scaled = Arc::new(RepartitionExec::try_new(
require_one_child(r_exe.children())?,
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
)?);
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
} else if matches!(plan.output_partitioning(), Partitioning::Hash(_, _)) {
// This might be a passthrough node, like a CoalesceBatchesExec or something like that.
// This is fine, we can let the node be here.
Ok(Transformed::no(plan))
} else {
return plan_err!(
"NetworkShuffleExec input must be hash partitioned, but {} is not",
plan.name()
);
}
})?;

Ok(InputStageInfo {
plan: next_stage_plan,
plan: transformed.data,
task_count: pending.input_tasks,
})
}
Expand Down
4 changes: 4 additions & 0 deletions src/test_utils/insta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ pub fn settings() -> insta::Settings {
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
settings.add_filter(
r"output_bytes=\d+.\d [(B)|(KB)|(MB)]",
"output_bytes=<metric>",
);
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");
Expand Down
24 changes: 12 additions & 12 deletions tests/distributed_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ mod tests {
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalesceBatchesExec: target_batch_size=8192
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
└──────────────────────────────────────────────────
",
);
Expand Down Expand Up @@ -141,14 +141,14 @@ mod tests {
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalesceBatchesExec: target_batch_size=8192
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
└──────────────────────────────────────────────────
",
);
Expand Down
Loading