Skip to content

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Nov 24, 2025

This adds a CoalesceBatchesExec after each NetworkShuffleExec in order to sending bigger and fewer batches over the wire.

While performing repartitions over the network, the same data stream might need be fanned out to a big number of output partitions, producing very small and numerous record batches, which can have a toll upon being sent as individual messages in the Arrow Flight stream.

With this change, record batches are coalesced into bigger ones so that less a bigger chunks are sent over the wire.


Benchmark results

This benchmarks were performed against a cluster of 4 t3.xlarge machines against TPCH stored in parquet files in S3 with a scale factor of 10

npm run datafusion-bench -- --sf 10 --files-per-task 4 --shuffle-batch-size 0 

# VS

npm run datafusion-bench -- --sf 10 --files-per-task 4 --shuffle-batch-size 8192
==== Comparison with previous run ====
      q1: prev=5614 ms, new=4222 ms, 1.33x faster ✅
      q2: prev=1393 ms, new=1202 ms, 1.16x faster ✔
      q3: prev=3467 ms, new=2757 ms, 1.26x faster ✅
      q4: prev=2826 ms, new=2206 ms, 1.28x faster ✅
      q5: prev=4503 ms, new=3856 ms, 1.17x faster ✔
      q6: prev=2183 ms, new=2094 ms, 1.04x faster ✔
      q7: prev=6073 ms, new=5287 ms, 1.15x faster ✔
      q8: prev=5891 ms, new=4835 ms, 1.22x faster ✅
      q9: prev=7189 ms, new=6498 ms, 1.11x faster ✔
     q10: prev=3378 ms, new=2965 ms, 1.14x faster ✔
     q11: prev=1533 ms, new=1366 ms, 1.12x faster ✔
     q12: prev=2730 ms, new=2677 ms, 1.02x faster ✔
     q13: prev=3499 ms, new=2989 ms, 1.17x faster ✔
     q14: prev=2170 ms, new=2236 ms, 1.03x slower ✖
     q15: prev= 107 ms, new= 106 ms, 1.01x faster ✔
     q16: prev=1008 ms, new= 967 ms, 1.04x faster ✔
     q17: prev=6269 ms, new=5813 ms, 1.08x faster ✔
     q18: prev=5878 ms, new=5030 ms, 1.17x faster ✔
     q19: prev=2711 ms, new=2861 ms, 1.06x slower ✖
     q20: prev=2791 ms, new=2916 ms, 1.04x slower ✖
     q21: prev=9742 ms, new=6927 ms, 1.41x faster ✅
     q22: prev= 967 ms, new= 922 ms, 1.05x faster ✔

@gabotechs
Copy link
Collaborator Author

🤔 Probably there's a better default, I'll play around with it changing some numbers

// which means that each resulting stream might contain tiny batches. It's important to
// have decent sized batches here as this will ultimately be sent over the wire, and the
// penalty there for sending many tiny batches instead of few big ones is big.
// TODO: After https://github.com/apache/datafusion/issues/18782 is shipped, the batching
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice TODO :)

Copy link
Collaborator

@geoffreyclaude geoffreyclaude left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having #243 to track the removal of this rule is great!

@gabotechs gabotechs merged commit f6dfaa6 into main Nov 27, 2025
4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/add-batch-coalescing-on-shuffles branch November 27, 2025 10:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants