Skip to content

Commit 92cfe97

Browse files
committed
Add CoalesceBatchExec in network shuffles
1 parent 4867a18 commit 92cfe97

File tree

8 files changed

+998
-881
lines changed

8 files changed

+998
-881
lines changed

src/distributed_planner/distributed_config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ extensions_options! {
2424
/// - If a node reduces the cardinality of the data, this factor will decrease.
2525
/// - In any other situation, this factor is left intact.
2626
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
27+
/// Upon shuffling over the network, data streams need to be disassembled in a lot of output
28+
/// partitions, which means the resulting streams might contain a lot of tiny record batches
29+
/// to be sent over the wire. This parameter controls the batch size in number of rows for
30+
/// the CoalesceBatchExec operator that is placed at the top of the stage for sending bigger
31+
/// batches over the wire.
32+
/// If set to 0, batch coalescing is disabled on network shuffle operations.
33+
pub shuffle_batch_size: usize, default = 8192
2734
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
2835
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
2936
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use datafusion::common::tree_node::TreeNodeRecursion;
1212
use datafusion::error::DataFusionError;
1313
use datafusion::physical_expr::Partitioning;
1414
use datafusion::physical_plan::ExecutionPlanProperties;
15+
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
1516
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1617
use datafusion::physical_plan::execution_plan::CardinalityEffect;
1718
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
@@ -300,6 +301,13 @@ fn _apply_network_boundaries(
300301
return Ok(ctx);
301302
}
302303
let task_count = ctx.scale_task_count_and_swap()?;
304+
// Network shuffles imply partitioning each data stream in a lot of different partitions,
305+
// which means that each resulting stream might contain tiny batches. It's important to
306+
// have decent sized batches here as this will ultimately be sent over the wire, and the
307+
// penalty there for sending many tiny batches instead of few big ones is big.
308+
if d_cfg.shuffle_batch_size > 0 {
309+
ctx.plan = Arc::new(CoalesceBatchesExec::new(ctx.plan, d_cfg.shuffle_batch_size));
310+
}
303311
ctx.plan = Arc::new(NetworkShuffleExec::try_new(ctx.plan, task_count)?);
304312
return Ok(ctx);
305313
}
@@ -552,11 +560,12 @@ mod tests {
552560
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
553561
└──────────────────────────────────────────────────
554562
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
555-
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
556-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
557-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
558-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
559-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
563+
│ CoalesceBatchesExec: target_batch_size=8192
564+
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
565+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
566+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
567+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
568+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
560569
└──────────────────────────────────────────────────
561570
");
562571
}
@@ -584,11 +593,12 @@ mod tests {
584593
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
585594
└──────────────────────────────────────────────────
586595
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
587-
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
588-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
589-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
590-
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
591-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
596+
│ CoalesceBatchesExec: target_batch_size=8192
597+
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
598+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
599+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
600+
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
601+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
592602
└──────────────────────────────────────────────────
593603
");
594604
}
@@ -638,11 +648,12 @@ mod tests {
638648
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
639649
└──────────────────────────────────────────────────
640650
┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3]
641-
│ RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=4
642-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
643-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
644-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
645-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
651+
│ CoalesceBatchesExec: target_batch_size=8192
652+
│ RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=4
653+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
654+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
655+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
656+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
646657
└──────────────────────────────────────────────────
647658
");
648659
}
@@ -695,11 +706,12 @@ mod tests {
695706
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
696707
└──────────────────────────────────────────────────
697708
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
698-
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
699-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
700-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
701-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
702-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
709+
│ CoalesceBatchesExec: target_batch_size=8192
710+
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
711+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
712+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
713+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
714+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
703715
└──────────────────────────────────────────────────
704716
");
705717
}
@@ -770,22 +782,24 @@ mod tests {
770782
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
771783
└──────────────────────────────────────────────────
772784
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
773-
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
774-
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
775-
│ CoalesceBatchesExec: target_batch_size=8192
776-
│ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
777-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
778-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
779-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
785+
│ CoalesceBatchesExec: target_batch_size=8192
786+
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
787+
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
788+
│ CoalesceBatchesExec: target_batch_size=8192
789+
│ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
790+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
791+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
792+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
780793
└──────────────────────────────────────────────────
781794
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3]
782-
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
783-
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
784-
│ CoalesceBatchesExec: target_batch_size=8192
785-
│ FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
786-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
787-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
788-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
795+
│ CoalesceBatchesExec: target_batch_size=8192
796+
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
797+
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
798+
│ CoalesceBatchesExec: target_batch_size=8192
799+
│ FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
800+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
801+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
802+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
789803
└──────────────────────────────────────────────────
790804
");
791805
}
@@ -832,11 +846,12 @@ mod tests {
832846
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
833847
└──────────────────────────────────────────────────
834848
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
835-
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
836-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
837-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
838-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
839-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
849+
│ CoalesceBatchesExec: target_batch_size=8192
850+
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
851+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
852+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
853+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
854+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
840855
└──────────────────────────────────────────────────
841856
");
842857
}

src/execution_plans/network_shuffle.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use arrow_flight::decode::FlightRecordBatchStream;
1313
use arrow_flight::error::FlightError;
1414
use bytes::Bytes;
1515
use dashmap::DashMap;
16+
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
1617
use datafusion::common::{exec_err, internal_datafusion_err, plan_err};
1718
use datafusion::error::DataFusionError;
1819
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -172,18 +173,29 @@ impl NetworkBoundary for NetworkShuffleExec {
172173
return plan_err!("cannot only return wrapped child if on Pending state");
173174
};
174175

175-
// TODO: Avoid downcasting once https://github.com/apache/datafusion/pull/17990 is shipped.
176-
let Some(r_exe) = pending.input.as_any().downcast_ref::<RepartitionExec>() else {
177-
return plan_err!("NetworkShuffleExec.input must always be RepartitionExec");
178-
};
179-
180-
let next_stage_plan = Arc::new(RepartitionExec::try_new(
181-
require_one_child(r_exe.children())?,
182-
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
183-
)?);
176+
let transformed = Arc::clone(&pending.input).transform_down(|plan| {
177+
if let Some(r_exe) = plan.as_any().downcast_ref::<RepartitionExec>() {
178+
// Scale the input RepartitionExec to account for all the tasks to which it will
179+
// need to fan data out.
180+
let scaled = Arc::new(RepartitionExec::try_new(
181+
require_one_child(r_exe.children())?,
182+
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
183+
)?);
184+
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
185+
} else if matches!(plan.output_partitioning(), Partitioning::Hash(_, _)) {
186+
// This might be a passthrough node, like a CoalesceBatchesExec or something like that.
187+
// This is fine, we can let the node be here.
188+
Ok(Transformed::no(plan))
189+
} else {
190+
return plan_err!(
191+
"NetworkShuffleExec input must be hash partitioned, but {} is not",
192+
plan.name()
193+
);
194+
}
195+
})?;
184196

185197
Ok(InputStageInfo {
186-
plan: next_stage_plan,
198+
plan: transformed.data,
187199
task_count: pending.input_tasks,
188200
})
189201
}

src/test_utils/insta.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ pub fn settings() -> insta::Settings {
164164
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
165165
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
166166
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
167+
settings.add_filter(r"output_bytes=\d+.\d [(B)|(Mb)]", "output_bytes=<metric>");
167168
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
168169
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
169170
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");

0 commit comments

Comments
 (0)