Skip to content

perf(parquet): column parallelism + S3 byte range prefetching for arrow-rs reader#6353

Draft
desmondcheongzx wants to merge 11 commits intomainfrom
desmond/intra-rg-col-parallelism
Draft

perf(parquet): column parallelism + S3 byte range prefetching for arrow-rs reader#6353
desmondcheongzx wants to merge 11 commits intomainfrom
desmond/intra-rg-col-parallelism

Conversation

@desmondcheongzx
Copy link
Collaborator

@desmondcheongzx desmondcheongzx commented Mar 6, 2026

The arrow-rs parquet reader was 15-25x slower than parquet2 on S3 reads due to three root causes: no cross-column I/O coalescing, pseudo-sequential async decode, and insufficient task parallelism. This PR closes the gap to parity.

S3 Benchmarks (TPC-H lineitem ~728MB, c7i.4xlarge)

Test RGs parquet2 arrow-rs (before) arrow-rs (after)
all_cols 1RG 2191ms 4834ms 1982ms
all_cols 8RG 1355ms 5638ms 1355ms
all_cols 64RG 1406ms 5592ms 1434ms
3_contig 1RG 778ms 1326ms 869ms
3_contig 8RG 562ms 1219ms 638ms
3_contig 64RG 630ms 1341ms 754ms

Root Causes and Fixes

1. No cross-column I/O coalescing (15-25x slowdown)

Each per-column DaftAsyncFileReader ran its own ReadPlanner, so coalescing only saw one column's byte ranges at a time. Parquet2 computed ALL column byte ranges upfront and coalesced them into fewer large HTTP requests.

To fix this, prefetch_column_ranges() computes all (RG, column) byte ranges and feeds them through a single ReadPlanner. The coalesced data is cached in an Arc<RangesContainer>, and each per-column reader gets a PrefetchedAsyncFileReader that serves get_byte_ranges() from cache with zero HTTP requests.

2. Pseudo-sequential async decode

try_join_all polls all sub-futures from a single tokio task - CPU-bound column decode ran on one worker thread despite having 16 column tasks. Replaced with get_compute_runtime().spawn() per task, creating independent tasks that tokio's work-stealing scheduler distributes across all DAFTCPU threads (NUM_CPUS).

3. Insufficient task parallelism for many-RG files

Per-column streams (16-way parallelism) forced each task to process all 64 RGs sequentially. Switched to per-(RG, col) tasks on the compute runtime, giving N_RGs * N_cols parallelism (1024 tasks for 64 RGs and 16 cols). Results are grouped by column, concat'd per-column in parallel, then hconcat'd once via spawn_column_decode().

Changes

src/daft-parquet/src/async_reader.rs

  • PrefetchedAsyncFileReader: AsyncFileReader impl backed by pre-fetched RangesContainer
  • build_read_planner_and_collect(): shared ReadPlanner setup used by both DaftAsyncFileReader and the prefetch path

src/daft-parquet/src/arrowrs_reader.rs

  • prefetch_column_ranges(): bulk byte range pre-fetching with cross-column/RG coalescing
  • root_to_leaf_columns(): maps root column indices to parquet leaf indices
  • decode_rg_predicate_phase_async_prefetched() / decode_rg_column_async_prefetched(): prefetched decode variants
  • spawn_column_decode(): shared helper for per-(RG, col) task spawn, collect, group-by-column, parallel concat
  • Modified read_parquet_single_arrowrs (Path 3) and stream_parquet_single_arrowrs (Path 4) to use prefetching + compute runtime dispatch
  • Removed old DaftAsyncFileReader-based async decode helpers

Intra-RG Column Parallelism (pre-existing on branch)

Adds column-level parallelism within each row group across all four read paths (sync bulk, sync stream, async bulk, async stream). For wide tables with few row groups, this parallelizes across columns rather than just across RGs.

@github-actions github-actions bot added the perf label Mar 6, 2026
@codspeed-hq
Copy link

codspeed-hq bot commented Mar 6, 2026

Merging this PR will not alter performance

✅ 36 untouched benchmarks


Comparing desmond/intra-rg-col-parallelism (f7fc593) with main (24e4c1d)

Open in CodSpeed

@codecov
Copy link

codecov bot commented Mar 6, 2026

Codecov Report

❌ Patch coverage is 35.96639% with 762 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.44%. Comparing base (2e68c2b) to head (f7fc593).
⚠️ Report is 22 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-parquet/src/arrowrs_reader.rs 33.86% 742 Missing ⚠️
src/daft-parquet/src/async_reader.rs 70.58% 20 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6353      +/-   ##
==========================================
- Coverage   74.82%   74.44%   -0.39%     
==========================================
  Files        1022     1021       -1     
  Lines      136386   137603    +1217     
==========================================
+ Hits       102051   102435     +384     
- Misses      34335    35168     +833     
Files with missing lines Coverage Δ
src/daft-parquet/src/async_reader.rs 60.55% <70.58%> (-15.05%) ⬇️
src/daft-parquet/src/arrowrs_reader.rs 58.22% <33.86%> (-25.96%) ⬇️

... and 115 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

desmondcheongzx and others added 4 commits March 11, 2026 10:33
Parallelize column decoding within each row group across all four read
paths. Opens separate readers per column with ProjectionMask::roots,
decodes independently, and hconcats results. Supports two-phase decode
when predicates are pushed (serial predicate phase, parallel data phase
with refined RowSelection).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Column-parallel decode opened the file independently for each column task,
adding 16x open() syscall overhead on a 16-column file. Read the file into
a bytes::Bytes buffer once and share it across column tasks via cheap
Bytes::clone() (atomic refcount, zero-copy). Each column reader gets its
own independent cursor over the shared buffer.

This fixes the CodSpeed regression in test_show[1 Small File] where the
per-column file opens added ~2.6ms overhead on a small 1024-row file.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add MIN_RG_BYTES_FOR_COL_PARALLELISM threshold (16 MiB uncompressed) to
fall back to decode_single_rg for small row groups where per-column reader
overhead (metadata clones, buffer setup, hconcat) exceeds the benefit of
parallel decode. Applied to both local streaming (Path 2) and local bulk
(Path 1) read paths.

The CodSpeed benchmark file (1024 rows, 16 cols, ~880KB uncompressed)
now takes the single-reader fast path instead of spawning 16 column tasks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ndle I/O

The previous approach read the entire file into a bytes::Bytes buffer upfront,
which added ~380ms of overhead for a 728MB file before any decode work started.

To fix this, each column task now opens its own file handle via File::open
(~microsecond syscall, independent seek position). The OS page cache serves
subsequent reads from memory, so there is no redundant I/O. This eliminated
the upfront read bottleneck and brought all_cols 8RG from 1440ms to 990ms
(parity with parquet2's 996ms).

Additional threshold tuning:
- MIN_COLS_FOR_COL_PARALLELISM = 3: routes 1-2 column reads to the simpler
  per-RG fallback path where column splitting overhead isn't justified.
- RG count check (rg_tasks < num_cpus * 2): when row groups already saturate
  cores (e.g. 64 RGs on 8 cores), per-RG decode is more efficient than
  column splitting with its per-builder overhead.
- Async paths use MIN_COLS_FOR_COL_PARALLELISM consistently.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@desmondcheongzx desmondcheongzx force-pushed the desmond/intra-rg-col-parallelism branch from 71f67f6 to 193b2ab Compare March 16, 2026 22:37
desmondcheongzx and others added 7 commits March 17, 2026 11:01
The arrow-rs parquet reader was 5-15x slower than parquet2 on S3 because
each per-column async reader created its own DaftAsyncFileReader, so the
ReadPlanner coalescing only saw one column's byte ranges at a time.

To fix this, we compute ALL needed (RG, column) byte ranges upfront and
feed them through a single ReadPlanner. The coalesced data is cached in
an Arc<RangesContainer>, and each per-column reader gets a
PrefetchedAsyncFileReader that serves get_byte_ranges() from cache with
zero additional HTTP requests.

Changes:
- Add PrefetchedAsyncFileReader backed by pre-fetched RangesContainer
- Extract build_read_planner_and_collect() as shared helper
- Add prefetch_column_ranges() for bulk byte range pre-fetching
- Add prefetched decode variants for predicate and column phases
- Apply prefetching to all async paths (bulk + stream, fallback + col-parallel)
- Remove now-unused DaftAsyncFileReader-based async decode helpers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The prefetched async decode helpers used DEFAULT_BATCH_SIZE (8192),
causing arrow-rs to emit many small batches per column per RG that
required an expensive concat_batches step (~15% of total S3 read time).

Set batch_size to the RG row count so each column decodes in a single
pass, eliminating the intermediate concat.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…oncat

Previously, async column-parallel decode created N_RGs * N_cols tasks,
each decoding one (RG, column) pair. The results required two concat
layers: hconcat within each RG, then RecordBatch::concat across all RGs.
The final concat alone copied ~728MB of data (~21% of total time).

Now we create N_cols tasks, each running a single ParquetRecordBatchStream
across ALL RGs. This mirrors parquet2's architecture: parallel across
columns, sequential across RGs within each column. The result is one
array per column spanning all RGs, assembled with a single hconcat (no
data copy, just schema + array ref merge).

For the predicate path, phase 1 (per-RG predicate decode) is unchanged
since per-RG RowSelections are needed. Phase 2 concatenates the per-RG
selections into one combined selection and passes it to per-column
streams across all RGs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
try_join_all polls all sub-futures from a single tokio task, so
CPU-bound column decode runs pseudo-sequentially on one worker thread.
tokio::spawn creates independent tasks that the work-stealing scheduler
distributes across all available worker threads.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move column decode tasks from the I/O runtime (DAFTIO, 8 threads) to
the compute runtime (DAFTCPU, NUM_CPUS threads). After prefetching,
column decode is pure CPU work - it should run on compute threads just
like the local reader does with rayon on DAFTCPU.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace per-column streams (16-way parallelism) with per-(RG, col)
tasks (N_RGs * N_cols parallelism) on the compute runtime. For 64 RGs
and 16 columns, this gives 1024-way task parallelism instead of 16.

Results are grouped by column and concat'd per-column in parallel, then
hconcat'd once. This avoids the old per-RG hconcat + cross-RG concat
pattern and matches the local reader's parallelism strategy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tion

Extract the per-(RG, col) spawn/collect/group/concat pattern into
spawn_column_decode(), removing ~90 lines of duplicated logic between
the predicate and no-predicate paths. Also fix a duplicate comment and
merge redundant limit branches.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@desmondcheongzx desmondcheongzx changed the title perf(parquet): add intra-row-group column parallelism to arrow-rs reader perf(parquet): column parallelism + S3 byte range prefetching for arrow-rs reader Mar 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant