Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Jan 17, 2026

Summary

This PR implements a sort-based shuffle mechanism for Ballista, similar to Spark's approach. Instead of creating one file per output partition (N×M files for N input partitions and M output partitions), each input partition writes to a single consolidated file with an index file mapping partition IDs to batch ranges.

Related Issues

Key Features

  • Consolidated output files: Each input partition produces one data file + one index file (2×N files instead of N×M)
  • Arrow IPC File format: Uses FileWriter/FileReader for efficient random access to specific record batches
  • Memory management: Configurable buffer sizes with spill-to-disk support to prevent OOM
  • Backward compatible: Sort-based shuffle is opt-in via configuration; default remains hash-based shuffle

Architecture

ballista/core/src/execution_plans/sort_shuffle/
├── mod.rs          # Module exports
├── config.rs       # SortShuffleConfig with buffer/memory settings
├── buffer.rs       # PartitionBuffer for in-memory buffering
├── spill.rs        # SpillManager for disk spilling
├── index.rs        # ShuffleIndex for batch range tracking
├── writer.rs       # SortShuffleWriterExec execution plan
└── reader.rs       # Reading logic with FileReader random access

Configuration Options

Option Type Default Description
ballista.shuffle.sort_based.enabled bool false Enable sort-based shuffle
ballista.shuffle.sort_based.buffer_size bytes 1MB Per-partition buffer size
ballista.shuffle.sort_based.memory_limit bytes 256MB Total memory limit for buffers
ballista.shuffle.sort_based.spill_threshold float 0.8 Spill when memory exceeds this fraction

Changes

Core

  • Added sort_shuffle module with all components (buffer, spill, index, writer, reader)
  • Added ShuffleWriter trait for polymorphic shuffle writer handling
  • Updated shuffle_reader.rs to detect and read sort-based shuffle format
  • Uses Arrow IPC File format (FileWriter/FileReader) for efficient random access

Executor

  • Updated DefaultExecutionEngine to handle both ShuffleWriterExec and SortShuffleWriterExec
  • Added ShuffleWriterVariant enum for type-safe dispatch

Scheduler/Planner

  • Updated DistributedPlanner to create SortShuffleWriterExec when enabled
  • Updated ExecutionStageBuilder to use ShuffleWriter trait

Benchmarks

  • Added shuffle_bench binary for comparing hash vs sort shuffle performance

Benchmark Results

Configuration: 1M rows, 4 input partitions, 16 output partitions

=== Hash-Based Shuffle ===
  Average time: 57.94ms
  Files created: 44
  Total size: 13077 KB
  Throughput: 493.81 MB/s

=== Sort-Based Shuffle ===
  Average time: 44.14ms
  Files created: 8
  Total size: 13101 KB
  Throughput: 648.14 MB/s

Performance Improvements:

  • 24% faster execution time (57.94ms → 44.14ms)
  • 82% fewer files (44 files → 8 files)
  • 31% higher throughput (493.81 MB/s → 648.14 MB/s)

Test Plan

  • Unit tests for all new components
  • All 43 ballista-core tests pass
  • All 15 ballista-executor tests pass
  • All 50 ballista-scheduler tests pass
  • 17 end-to-end integration tests (ballista/client/tests/sort_shuffle.rs)
  • Comparison tests verifying sort shuffle produces same results as hash shuffle

Running the Benchmark

# Default settings (1M rows, 16 output partitions)
cargo run --release --bin shuffle_bench

# Custom settings
cargo run --release --bin shuffle_bench -- --rows 1000000 --partitions 32 --input-partitions 8

# Run only sort shuffle  
cargo run --release --bin shuffle_bench -- --sort-only

References

This implementation is inspired by:


🤖 Generated with Claude Code

andygrove and others added 5 commits January 17, 2026 10:36
This adds an alternative shuffle implementation that writes a single
consolidated file per input partition (sorted by output partition ID)
along with an index file, similar to Apache Spark's sort-based shuffle.

This approach reduces file count from N × M (N input partitions ×
M output partitions) to 2 × N files (one data + one index file per
input partition).

Key components:
- SortShuffleWriterExec: ExecutionPlan implementation
- PartitionBuffer: In-memory buffering per partition
- SpillManager: Spill-to-disk when memory pressure is high
- ShuffleIndex: Index file I/O (little-endian i64 offsets)
- SortShuffleConfig: Configuration (buffer size, memory limit, spill threshold)

New configuration options:
- ballista.shuffle.sort_based.enabled (default: false)
- ballista.shuffle.sort_based.buffer_size (default: 1MB)
- ballista.shuffle.sort_based.memory_limit (default: 256MB)
- ballista.shuffle.sort_based.spill_threshold (default: 0.8)

This implementation is inspired by Apache DataFusion Comet's shuffle writer:
https://github.com/apache/datafusion-comet/blob/main/native/core/src/execution/shuffle/shuffle_writer.rs

Work remaining:
- Update DistributedPlanner to use SortShuffleWriterExec when enabled
- Update ShuffleReaderExec to detect and read sort shuffle format
- Add protobuf serialization for the new execution plan
- End-to-end integration tests

Co-Authored-By: Claude Opus 4.5 <[email protected]>
This commit integrates the sort-based shuffle writer with the distributed
planner and execution graph:

- Add ShuffleWriter trait to provide a common interface for both
  ShuffleWriterExec and SortShuffleWriterExec
- Update DistributedPlanner to return Vec<Arc<dyn ShuffleWriter>>
- Update ExecutionStageBuilder to work with ShuffleWriter trait
- Add protobuf serialization for SortShuffleWriterExec
- Update execution_graph, execution_stage, and diagram modules

The planner will now create SortShuffleWriterExec when:
- ballista.shuffle.sort_based.enabled is true
- The partitioning is Hash partitioning

Co-Authored-By: Claude Opus 4.5 <[email protected]>
…chmarks

- Update fetch_partition_local to detect sort-based shuffle format
  by checking for the presence of an index file
- When sort shuffle is detected, read partition data using the
  index file to locate the correct byte range
- Add shuffle_bench binary for comparing hash vs sort shuffle performance
  - Configurable number of rows, partitions, batch size
  - Reports timing, file count, and throughput metrics

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- Update DefaultExecutionEngine to handle both ShuffleWriterExec and
  SortShuffleWriterExec by introducing ShuffleWriterVariant enum
- Fix sort shuffle writer to store cumulative batch counts in index
  instead of placeholder byte offsets
- Fix sort shuffle reader to correctly identify partition boundaries
  using the batch count index
- Add comprehensive end-to-end integration tests for sort-based shuffle
  covering aggregations, GROUP BY, ORDER BY, and comparison with
  hash-based shuffle

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- Update writer to use FileWriter instead of StreamWriter
- Update reader to use FileReader with set_index() for direct batch access
- FileReader supports random access via the IPC footer, eliminating the
  need to scan through preceding batches to reach the target partition
- Index still stores cumulative batch counts for partition boundaries

This improves read performance for later partitions (e.g., partition 15
of 16) by directly seeking to the needed batches instead of reading
and discarding all preceding data.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@andygrove andygrove changed the title [WIP] feat: Add sort-based shuffle writer feat: Add sort-based shuffle implementation Jan 17, 2026
andygrove and others added 2 commits January 17, 2026 12:04
Use is_multiple_of() instead of manual modulo check.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- Add FinalizeResult type alias to reduce type complexity
- Add #[allow(clippy::too_many_arguments)] for finalize_output function
- Use iter_mut().enumerate() instead of index-based loop

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@andygrove andygrove marked this pull request as ready for review January 17, 2026 19:11
@andygrove andygrove requested a review from milenkovicm January 17, 2026 19:11
andygrove and others added 4 commits January 17, 2026 14:35
When running the tpch benchmark, the --query parameter is now optional.
If not specified, all 22 TPC-H queries will be run sequentially.

Changes:
- Make --query optional for both datafusion and ballista benchmarks
- Run all 22 queries when --query is not specified
- Only print SQL queries when --debug flag is enabled
- Write a single JSON output file for the entire benchmark run
- Fix parquet file path resolution for datafusion benchmarks
- Simplify output when iterations=1 (no iteration number, no average)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Add CLI option to enable sort-based shuffle in the TPC-H benchmark
when running against Ballista.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@milenkovicm
Copy link
Contributor

I'll review this tomorrow morning

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @andygrove,

I had a quick look, I think this approach makes sense. I'm not sure if it will be faster than hash in case of big shuffles, but benchmark will definetly help to understand that

two group of coments

  • i would suggest to return stream of record batches rather than vecrtor of vector batches.
  • remote shuffle read does not work at the moment. we probably need to update flight server to support new file format.

having in mind #1318, I'm not sure if/how can we avoid decompressing and decoding result file on mapper side, does it have to be valid ipc file? just idea for discussion

//! This module provides an alternative to the hash-based shuffle that writes
//! a single consolidated file per input partition (sorted by output partition ID)
//! along with an index file mapping partition IDs to byte offsets.
//!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could add paragraph from referred spark documentation, I personally find it as good algorithm outline: "Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks."

let config = SessionConfig::new_with_ballista()
.set_str(BALLISTA_SHUFFLE_SORT_BASED_ENABLED, "true")
.set_str(BALLISTA_SHUFFLE_SORT_BASED_BUFFER_SIZE, "1048576") // 1MB
.set_str(BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT, "268435456"); // 256MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding .set_str(BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ, "true") will force remote read using flight. at the moment tests will fail if set. I would suggest using rstest to test local and remote read (contex with and without this option set)

let config = SessionConfig::new_with_ballista()
.set_str(BALLISTA_SHUFFLE_SORT_BASED_ENABLED, "true")
.set_str(BALLISTA_SHUFFLE_SORT_BASED_BUFFER_SIZE, "1048576") // 1MB
.set_str(BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT, "268435456"); // 256MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, one note, #1318 introduces block transfer rather than flight transfer to override and fail back to flight transport .set_str(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, "true") is needed.

I believe we would need to patch flight server to support transport of sorted shuffles


//! Sort-based shuffle implementation for Ballista.
//!
//! This module provides an alternative to the hash-based shuffle that writes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest change from:

"This module provides an alternative to the hash-based shuffle that writes" to
"This module provides an alternative to the hash-based shuffle. It writes"

I personally find it easier to understand, but it's a personal option

data_path: &Path,
index_path: &Path,
partition_id: usize,
) -> Result<Vec<RecordBatch>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to return a stream instead of loading all vectors in a single vector?

///
/// # Returns
/// Vector of all record batches in the file.
pub fn read_all_batches(data_path: &Path) -> Result<Vec<RecordBatch>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to return a stream instead of loading all vectors in a single vector

}

/// Reads all spill files for a partition and returns the batches.
pub fn read_spill_files(&self, partition_id: usize) -> Result<Vec<RecordBatch>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to avoid vector and use stream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep this as vector, perhaps just return StreamReader?

reader.schema()
};

let stream = futures::stream::iter(batches.into_iter().map(Ok));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, getting stream from reader instead of all batches may make sense

@milenkovicm
Copy link
Contributor

one note, maybe it would make sense to test with BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ=true as well, as it would represent worst case scenario

andygrove and others added 3 commits January 18, 2026 09:20
…eader

Change sort shuffle partition reading to return a lazy stream that yields
batches on-demand rather than collecting all batches into memory upfront.
This reduces memory usage for large partitions.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Add algorithm description from Apache Spark documentation and improve
wording for clarity.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Update the flight server's do_get handler to detect sort-based shuffle
files and read only the relevant partition using the index file. This
enables remote shuffle reads for sort-based shuffle.

Also add detection in IO_BLOCK_TRANSPORT to return an informative error
since block transport doesn't support sort-based shuffle (it transfers
the entire file which contains all partitions).

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Use rstest to parameterize sort shuffle tests to run with both local
reads and remote reads via the flight service. This ensures the flight
service correctly handles sort-based shuffle partition requests.

Tests now run in two modes:
- Local: reads shuffle data from local filesystem (default)
- RemoteFlight: forces remote read through flight service

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@andygrove
Copy link
Member Author

Thanks for the review @milenkovicm! I've addressed the feedback in the following commits:

Addressed

1. Return stream instead of Vec<RecordBatch> (d4fcf1a)

Added stream_sort_shuffle_partition() that returns a SendableRecordBatchStream instead of collecting all batches into a vector. This reads batches lazily on-demand, reducing memory usage for large partitions.

2. Documentation improvements (0445c48)

  • Added the Spark algorithm description: "Internally, results from individual map tasks are kept in memory until they can't fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks."
  • Fixed wording: "that writes" → "It writes"

3. Flight service support for remote reads (53b5d57)

  • Updated do_get to detect sort-based shuffle files and read only the requested partition using the index
  • Added detection in IO_BLOCK_TRANSPORT to return an informative error (block transport can't support sort-based shuffle since it transfers the entire file containing all partitions)

4. Parameterized tests with rstest (a297d47)

Added remote flight read tests using rstest to parameterize all 15 sort shuffle tests. Tests now run in two modes:

  • Local: reads shuffle data from local filesystem
  • RemoteFlight: forces remote read through flight service with BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ=true and BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT=true

All 32 tests pass (15 tests × 2 modes + 2 comparison tests).

Notes

  • Streaming for spill.rs: Evaluated but deferred - batches flow through quickly during finalization so memory impact is less critical
  • Block transport: Returns Status::unimplemented for sort-based shuffle with a message directing users to enable flight transport

andygrove and others added 2 commits January 18, 2026 09:36
Co-Authored-By: Claude Opus 4.5 <[email protected]>
@milenkovicm
Copy link
Contributor

Thanks @andygrove will try to have a look later.

One question, I'm not sure if it makes sense.

  • What if instead spilling to temporary file spill goes to output file directly, and index to keep more than one partition id -> offset mapping.
  • Read would need to do few more file seeks, as batches for same partition are scattered around, but should not be too bad as reads should be able to read many batches together (as batches are buffered before write). This would save spill batch reconciliation at the end.

@andygrove
Copy link
Member Author

Thanks @andygrove will try to have a look later.

One question, I'm not sure if it makes sense.

  • What if instead spilling to temporary file spill goes to output file directly, and index to keep more than one partition id -> offset mapping.
  • Read would need to do few more file seeks, as batches for same partition are scattered around, but should not be too bad as reads should be able to read many batches together (as batches are buffered before write). This would save spill batch reconciliation at the end.

That's an interesting idea. I will experiment with that in a separate PR.

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks good to me @andygrove
there is one vector which should be stream, and a minor nitpicking

return Ok(Response::new(
Box::pin(flight_data_stream) as Self::DoGetStream
));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking, consider adding else branch

}

/// Reads all spill files for a partition and returns the batches.
pub fn read_spill_files(&self, partition_id: usize) -> Result<Vec<RecordBatch>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep this as vector, perhaps just return StreamReader?

@andygrove
Copy link
Member Author

Thanks for the review @milenkovicm. I'll go ahead and merge and will keep iterating on this to address the remaining feedback.

@andygrove andygrove merged commit 91558a5 into apache:main Jan 21, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Investigate use of comet shuffle

2 participants