Skip to content

Commit f6dfaa6

Browse files
authored
Add batch coalescing in NetworkShuffleExec operations (#242)
* Add CoalesceBatchExec in network shuffles * Add param to CDK benchmarks * Remove redundant batch coalescing * Add comment for the future
1 parent 44633c9 commit f6dfaa6

File tree

9 files changed

+1030
-984
lines changed

9 files changed

+1030
-984
lines changed

benchmarks/cdk/bin/datafusion-bench.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ async function main () {
1515
.option('-i, --iterations <number>', 'Number of iterations', '3')
1616
.option('--files-per-task <number>', 'Files per task', '4')
1717
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
18+
.option('--shuffle-batch-size <number>', 'Shuffle batch coalescing size (number of rows)', '8192')
1819
.option('--query <number>', 'A specific query to run', undefined)
1920
.parse(process.argv);
2021

@@ -24,6 +25,7 @@ async function main () {
2425
const iterations = parseInt(options.iterations);
2526
const filesPerTask = parseInt(options.filesPerTask);
2627
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
28+
const shuffleBatchSize = parseInt(options.shuffleBatchSize);
2729

2830
// Compare with previous results first
2931
const results: BenchmarkResults = { queries: [] };
@@ -33,7 +35,8 @@ async function main () {
3335
await query(createTablesSql(sf))
3436
await query(`
3537
SET distributed.files_per_task=${filesPerTask};
36-
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf}
38+
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf};
39+
SET distributed.shuffle_batch_size=${shuffleBatchSize}
3740
`)
3841

3942
for (let id of IDS) {

src/distributed_planner/distributed_config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ extensions_options! {
2424
/// - If a node reduces the cardinality of the data, this factor will decrease.
2525
/// - In any other situation, this factor is left intact.
2626
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
27+
/// Upon shuffling over the network, data streams need to be disassembled in a lot of output
28+
/// partitions, which means the resulting streams might contain a lot of tiny record batches
29+
/// to be sent over the wire. This parameter controls the batch size in number of rows for
30+
/// the CoalesceBatchExec operator that is placed at the top of the stage for sending bigger
31+
/// batches over the wire.
32+
/// If set to 0, batch coalescing is disabled on network shuffle operations.
33+
pub shuffle_batch_size: usize, default = 8192
2734
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
2835
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
2936
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 72 additions & 53 deletions
Large diffs are not rendered by default.

src/execution_plans/network_shuffle.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use arrow_flight::decode::FlightRecordBatchStream;
1313
use arrow_flight::error::FlightError;
1414
use bytes::Bytes;
1515
use dashmap::DashMap;
16+
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
1617
use datafusion::common::{exec_err, internal_datafusion_err, plan_err};
1718
use datafusion::error::DataFusionError;
1819
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -172,18 +173,29 @@ impl NetworkBoundary for NetworkShuffleExec {
172173
return plan_err!("cannot only return wrapped child if on Pending state");
173174
};
174175

175-
// TODO: Avoid downcasting once https://github.com/apache/datafusion/pull/17990 is shipped.
176-
let Some(r_exe) = pending.input.as_any().downcast_ref::<RepartitionExec>() else {
177-
return plan_err!("NetworkShuffleExec.input must always be RepartitionExec");
178-
};
179-
180-
let next_stage_plan = Arc::new(RepartitionExec::try_new(
181-
require_one_child(r_exe.children())?,
182-
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
183-
)?);
176+
let transformed = Arc::clone(&pending.input).transform_down(|plan| {
177+
if let Some(r_exe) = plan.as_any().downcast_ref::<RepartitionExec>() {
178+
// Scale the input RepartitionExec to account for all the tasks to which it will
179+
// need to fan data out.
180+
let scaled = Arc::new(RepartitionExec::try_new(
181+
require_one_child(r_exe.children())?,
182+
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
183+
)?);
184+
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
185+
} else if matches!(plan.output_partitioning(), Partitioning::Hash(_, _)) {
186+
// This might be a passthrough node, like a CoalesceBatchesExec or something like that.
187+
// This is fine, we can let the node be here.
188+
Ok(Transformed::no(plan))
189+
} else {
190+
return plan_err!(
191+
"NetworkShuffleExec input must be hash partitioned, but {} is not",
192+
plan.name()
193+
);
194+
}
195+
})?;
184196

185197
Ok(InputStageInfo {
186-
plan: next_stage_plan,
198+
plan: transformed.data,
187199
task_count: pending.input_tasks,
188200
})
189201
}

src/test_utils/insta.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ pub fn settings() -> insta::Settings {
164164
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
165165
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
166166
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
167+
settings.add_filter(
168+
r"output_bytes=\d+.\d [(B)|(KB)|(MB)]",
169+
"output_bytes=<metric>",
170+
);
167171
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
168172
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
169173
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");

tests/distributed_aggregation.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ mod tests {
5959
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
6060
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
6161
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
62-
│ CoalesceBatchesExec: target_batch_size=8192
63-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
62+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
6463
└──────────────────────────────────────────────────
6564
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
66-
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
67-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
68-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
69-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
65+
│ CoalesceBatchesExec: target_batch_size=8192
66+
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
67+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
68+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
69+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
7070
└──────────────────────────────────────────────────
7171
",
7272
);
@@ -141,14 +141,14 @@ mod tests {
141141
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
142142
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
143143
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
144-
│ CoalesceBatchesExec: target_batch_size=8192
145-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
144+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
146145
└──────────────────────────────────────────────────
147146
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
148-
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
149-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
150-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
151-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
147+
│ CoalesceBatchesExec: target_batch_size=8192
148+
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
149+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
150+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
151+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
152152
└──────────────────────────────────────────────────
153153
",
154154
);

0 commit comments

Comments
 (0)