Skip to content

Commit 8640f2f

Browse files
committed
Remove redundant batch coalescing
1 parent bb78902 commit 8640f2f

File tree

5 files changed

+167
-246
lines changed

5 files changed

+167
-246
lines changed

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,12 @@ fn _apply_network_boundaries(
310310
}
311311
ctx.plan = Arc::new(NetworkShuffleExec::try_new(ctx.plan, task_count)?);
312312
return Ok(ctx);
313+
} else if let Some(coalesce_batches) = ctx.plan.as_any().downcast_ref::<CoalesceBatchesExec>() {
314+
// If the batch coalescing is before the network boundary, remove it, as we don't
315+
// want it there, we want it after, and the code that adds it lives just some lines above.
316+
if coalesce_batches.input().is_network_boundary() {
317+
ctx.plan = Arc::clone(coalesce_batches.input());
318+
}
313319
}
314320

315321
// If this is a CoalescePartitionsExec, it means that the original plan is trying to
@@ -556,8 +562,7 @@ mod tests {
556562
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
557563
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
558564
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
559-
│ CoalesceBatchesExec: target_batch_size=8192
560-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
565+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
561566
└──────────────────────────────────────────────────
562567
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
563568
│ CoalesceBatchesExec: target_batch_size=8192
@@ -589,8 +594,7 @@ mod tests {
589594
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
590595
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
591596
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
592-
│ CoalesceBatchesExec: target_batch_size=8192
593-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
597+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
594598
└──────────────────────────────────────────────────
595599
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
596600
│ CoalesceBatchesExec: target_batch_size=8192
@@ -644,8 +648,7 @@ mod tests {
644648
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
645649
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
646650
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
647-
│ CoalesceBatchesExec: target_batch_size=8192
648-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
651+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
649652
└──────────────────────────────────────────────────
650653
┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3]
651654
│ CoalesceBatchesExec: target_batch_size=8192
@@ -702,8 +705,7 @@ mod tests {
702705
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
703706
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
704707
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
705-
│ CoalesceBatchesExec: target_batch_size=8192
706-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
708+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
707709
└──────────────────────────────────────────────────
708710
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
709711
│ CoalesceBatchesExec: target_batch_size=8192
@@ -772,14 +774,12 @@ mod tests {
772774
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
773775
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
774776
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
775-
│ CoalesceBatchesExec: target_batch_size=8192
776-
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=3
777+
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=3
777778
└──────────────────────────────────────────────────
778779
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
779780
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
780781
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
781-
│ CoalesceBatchesExec: target_batch_size=8192
782-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
782+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
783783
└──────────────────────────────────────────────────
784784
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
785785
│ CoalesceBatchesExec: target_batch_size=8192
@@ -842,8 +842,7 @@ mod tests {
842842
└──────────────────────────────────────────────────
843843
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
844844
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
845-
│ CoalesceBatchesExec: target_batch_size=8192
846-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
845+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
847846
└──────────────────────────────────────────────────
848847
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
849848
│ CoalesceBatchesExec: target_batch_size=8192

src/test_utils/insta.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,10 @@ pub fn settings() -> insta::Settings {
164164
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
165165
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
166166
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
167-
settings.add_filter(r"output_bytes=\d+.\d [(B)|(Mb)]", "output_bytes=<metric>");
167+
settings.add_filter(
168+
r"output_bytes=\d+.\d [(B)|(KB)|(MB)]",
169+
"output_bytes=<metric>",
170+
);
168171
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
169172
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
170173
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");

tests/distributed_aggregation.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ mod tests {
5959
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
6060
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
6161
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
62-
│ CoalesceBatchesExec: target_batch_size=8192
63-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
62+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
6463
└──────────────────────────────────────────────────
6564
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
6665
│ CoalesceBatchesExec: target_batch_size=8192
@@ -142,8 +141,7 @@ mod tests {
142141
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
143142
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
144143
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
145-
│ CoalesceBatchesExec: target_batch_size=8192
146-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
144+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
147145
└──────────────────────────────────────────────────
148146
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
149147
│ CoalesceBatchesExec: target_batch_size=8192

0 commit comments

Comments
 (0)