perf(parquet): add intra-row-group column parallelism to arrow-rs reader#6423
perf(parquet): add intra-row-group column parallelism to arrow-rs reader#6423desmondcheongzx wants to merge 7 commits intomainfrom
Conversation
Parallelize column decoding within each row group across all four read paths. Opens separate readers per column with ProjectionMask::roots, decodes independently, and hconcats results. Supports two-phase decode when predicates are pushed (serial predicate phase, parallel data phase with refined RowSelection). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Column-parallel decode opened the file independently for each column task, adding 16x open() syscall overhead on a 16-column file. Read the file into a bytes::Bytes buffer once and share it across column tasks via cheap Bytes::clone() (atomic refcount, zero-copy). Each column reader gets its own independent cursor over the shared buffer. This fixes the CodSpeed regression in test_show[1 Small File] where the per-column file opens added ~2.6ms overhead on a small 1024-row file. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add MIN_RG_BYTES_FOR_COL_PARALLELISM threshold (16 MiB uncompressed) to fall back to decode_single_rg for small row groups where per-column reader overhead (metadata clones, buffer setup, hconcat) exceeds the benefit of parallel decode. Applied to both local streaming (Path 2) and local bulk (Path 1) read paths. The CodSpeed benchmark file (1024 rows, 16 cols, ~880KB uncompressed) now takes the single-reader fast path instead of spawning 16 column tasks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ndle I/O The previous approach read the entire file into a bytes::Bytes buffer upfront, which added ~380ms of overhead for a 728MB file before any decode work started. To fix this, each column task now opens its own file handle via File::open (~microsecond syscall, independent seek position). The OS page cache serves subsequent reads from memory, so there is no redundant I/O. This eliminated the upfront read bottleneck and brought all_cols 8RG from 1440ms to 990ms (parity with parquet2's 996ms). Additional threshold tuning: - MIN_COLS_FOR_COL_PARALLELISM = 3: routes 1-2 column reads to the simpler per-RG fallback path where column splitting overhead isn't justified. - RG count check (rg_tasks < num_cpus * 2): when row groups already saturate cores (e.g. 64 RGs on 8 cores), per-RG decode is more efficient than column splitting with its per-builder overhead. - Async paths use MIN_COLS_FOR_COL_PARALLELISM consistently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR adds intra-row-group column parallelism to the arrow-rs parquet reader across all four read paths (local bulk, local stream, remote bulk, remote stream), targeting wide tables with few row groups. When a table has ≥ 3 columns and a row group exceeds 16 MiB, columns are decoded in parallel via separate per-column readers and merged with Key issues:
Confidence Score: 2/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Read Parquet Request] --> B{Local or Remote?}
B -- Local sync --> C[local_parquet_read_arrowrs]
B -- Local stream --> D[local_parquet_stream_arrowrs]
B -- Remote bulk --> E[read_parquet_single_arrowrs]
B -- Remote stream --> F[stream_parquet_single_arrowrs]
C --> G{use_col_parallelism?}
G -- No --> H[decode_single_rg per RG\nrayon par_iter over RGs]
G -- Yes --> I{predicate_pushed?}
I -- Yes --> J[Phase 1: decode pred cols per RG\nrayon par_iter]
I -- No --> K[flat col_tasks par_iter\nover RG×col pairs]
J --> L[Phase 2: decode data cols\nrayon par_iter over RG×col]
L --> M[hconcat per RG\nfinalize_batch]
K --> M
D --> N[For each RG task]
N --> O[decode_single_rg_col_parallel\nrayon col split inside]
E --> P{all_col_indices < 3?}
P -- Yes fallback --> Q[Original single stream\nwith_offset + RowFilter]
P -- No --> R{predicate_pushed?}
R -- Yes --> S[Phase 1: concurrent pred col decode\ntry_join_all]
R -- No --> T[concurrent col decode\ntry_join_all RG×col]
S --> U[Phase 2: concurrent data col decode\nper RG semaphore-gated]
U --> V[hconcat per RG\nfinalize_batch + limit]
T --> V
F --> W{all_col_indices < 3?}
W -- Yes fallback --> X[Original stream mapped\nwith_offset + RowFilter]
W -- No --> Y[Stream per RG\nasync then closure]
Y --> Z[col_futs try_join_all per RG\nhconcat + finalize_batch]
Z --> AA[scan-based limit applied\nto output stream]
Last reviewed commit: "fix(parquet): addres..." |
|
|
||
| let mut all_batches = vec![pred_batch]; | ||
| all_batches.extend(col_batches); | ||
| let merged = hconcat_record_batches(&all_batches)?; | ||
| let daft_batch = RecordBatch::try_from(&merged)?; | ||
| finalize_batch( | ||
| daft_batch, | ||
| None, | ||
| true, | ||
| &setup.read_daft_schema, | ||
| &setup.return_daft_schema, | ||
| ) |
There was a problem hiding this comment.
Column ordering corruption when predicate columns are returned
When predicate_pushed=true and column parallelism is active, pred_batch (predicate columns) is prepended to data_batches in hconcat_record_batches. This produces a merged schema of [pred_cols..., data_cols...] instead of file-schema order.
finalize_batch only reorders via get_columns when read_schema.len() != return_schema.len(). If the user requests all columns (or their requested set already contains the predicate columns), both lengths are equal and no reordering occurs — the output columns land in the wrong order.
Concrete example: file columns [a, b, c], predicate on c, user reads all.
pred_batchschema:[c]data_batchesschemas:[a],[b]mergedschema:[c, a, b]← predicate column firstfinalize_batch:read_schema.len() == return_schema.len()→ skips reorder- Output:
[c, a, b]instead of[a, b, c]
The same bug exists in all four paths that call finalize_batch(…, None, true, …) after hconcat_record_batches:
decode_single_rg_col_parallel(line ~731)local_parquet_read_arrowrs(line ~1895)read_parquet_single_arrowrs(line ~1279)stream_parquet_single_arrowrs(line ~2410)
The fix is to unconditionally reorder in finalize_batch (look up every return column by name from the table, not only when lengths differ), or to sort the hconcatenated batch columns to match return_daft_schema order before calling finalize_batch.
// In finalize_batch, change:
if read_schema.len() != return_schema.len() {
// to:
if read_schema != return_schema {This will also reorder when lengths happen to match but ordering differs.
| if all_col_indices.len() < 3 || rg_byte_size < MIN_RG_BYTES_FOR_COL_PARALLELISM { | ||
| return decode_single_rg(path, setup, task, predicate, None); |
There was a problem hiding this comment.
Magic number
3 instead of constant
This hardcodes 3 instead of using MIN_COLS_FOR_COL_PARALLELISM. If the threshold is ever changed, this call site would be missed. Additionally, the comment just above (line 685) says "even 2 columns benefit from splitting", which contradicts the < 3 check — the comment should be corrected to align with the actual threshold.
| if all_col_indices.len() < 3 || rg_byte_size < MIN_RG_BYTES_FOR_COL_PARALLELISM { | |
| return decode_single_rg(path, setup, task, predicate, None); | |
| if all_col_indices.len() < MIN_COLS_FOR_COL_PARALLELISM || rg_byte_size < MIN_RG_BYTES_FOR_COL_PARALLELISM { |
| fn bool_array_to_row_selection(mask: &arrow::array::BooleanArray) -> RowSelection { | ||
| use arrow::array::Array; |
There was a problem hiding this comment.
Inline import inside function body
use arrow::array::Array is placed inside the function rather than at the top of the file. Per project convention, imports should be at the top of the file.
| fn bool_array_to_row_selection(mask: &arrow::array::BooleanArray) -> RowSelection { | |
| use arrow::array::Array; | |
| fn bool_array_to_row_selection(mask: &arrow::array::BooleanArray) -> RowSelection { | |
| let mut selectors = Vec::new(); |
And add use arrow::array::Array; to the top-level use block at the top of the file.
Rule Used: Import statements should be placed at the top of t... (source)
Learnt From
Eventual-Inc/Daft#5078
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| #[allow(clippy::too_many_arguments, clippy::ref_option)] | ||
| async fn decode_rg_predicate_phase_async( |
There was a problem hiding this comment.
Suppressing clippy warnings instead of fixing them
#[allow(clippy::too_many_arguments, clippy::ref_option)] silences two clippy warnings rather than fixing the underlying issues. Per project convention, clippy warnings should be resolved rather than suppressed.
clippy::ref_option:&Option<IOStatsRef>should beOption<&IOStatsRef>(avoids double indirection). The same attribute also appears ondecode_rg_column_asyncat line 856.clippy::too_many_arguments: Consider grouping related parameters (e.g.,io_client,io_stats,parquet_metadata,arrow_schema) into a shared context struct — this pattern is already used elsewhere in this file withLocalParquetSetup.
Rule Used: Fix clippy warnings instead of suppressing them wi... (source)
Learnt From
Eventual-Inc/Daft#5207
- Fix column ordering bug: finalize_batch now reorders when schema order differs (not just length), fixing predicate column misordering after hconcat - Use MIN_COLS_FOR_COL_PARALLELISM constant instead of magic number 3 - Move inline `use arrow::array::Array` to top-level imports Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@greptile |
| #[allow(clippy::too_many_arguments, clippy::ref_option)] | ||
| async fn decode_rg_column_async( | ||
| uri: &str, | ||
| io_client: &Arc<IOClient>, | ||
| io_stats: &Option<IOStatsRef>, | ||
| parquet_metadata: &Arc<ParquetMetaData>, | ||
| arrow_schema: &Arc<arrow::datatypes::Schema>, | ||
| col_root_index: usize, | ||
| rg_idx: usize, | ||
| row_selection: Option<RowSelection>, | ||
| ) -> DaftResult<arrow::array::RecordBatch> { | ||
| let reader = DaftAsyncFileReader::new( | ||
| uri.to_string(), | ||
| io_client.clone(), | ||
| io_stats.clone(), | ||
| Some(parquet_metadata.clone()), | ||
| None, | ||
| ); | ||
| let options = ArrowReaderOptions::new().with_schema(arrow_schema.clone()); | ||
| let arrow_reader_metadata = | ||
| ArrowReaderMetadata::try_new(parquet_metadata.clone(), options).map_err(parquet_err)?; | ||
| let mut builder = | ||
| ParquetRecordBatchStreamBuilder::new_with_metadata(reader, arrow_reader_metadata); | ||
|
|
||
| let mask = ProjectionMask::roots(builder.parquet_schema(), std::iter::once(col_root_index)); | ||
| builder = builder | ||
| .with_projection(mask) | ||
| .with_row_groups(vec![rg_idx]) | ||
| .with_batch_size(DEFAULT_BATCH_SIZE); | ||
|
|
||
| if let Some(sel) = row_selection { | ||
| builder = builder.with_row_selection(sel); | ||
| } | ||
|
|
||
| let stream = builder.build().map_err(parquet_err)?; | ||
| let arrow_batches: Vec<arrow::array::RecordBatch> = | ||
| stream.try_collect().await.map_err(parquet_err)?; | ||
|
|
||
| if arrow_batches.is_empty() { | ||
| let field = arrow_schema.field(col_root_index).clone(); | ||
| let schema = Arc::new(arrow::datatypes::Schema::new(vec![field])); | ||
| Ok(arrow::array::RecordBatch::new_empty(schema)) | ||
| } else if arrow_batches.len() == 1 { | ||
| Ok(arrow_batches.into_iter().next().unwrap()) | ||
| } else { | ||
| arrow::compute::concat_batches(&arrow_batches[0].schema(), &arrow_batches) | ||
| .map_err(|e| parquet_err(e).into()) | ||
| } | ||
| } |
There was a problem hiding this comment.
batch_size silently ignored in async column-parallel decode helpers
decode_rg_column_async and decode_rg_predicate_phase_async both hardcode DEFAULT_BATCH_SIZE (line 801 and line 885). The batch_size parameter passed to read_parquet_single_arrowrs and stream_parquet_single_arrowrs is correctly forwarded in the fallback single-stream path (e.g. line 1138 let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); builder2 = builder2.with_batch_size(batch_size);), but completely ignored in the column-parallel code paths that call these two helpers.
This breaks the API contract: users who set a custom batch_size to control memory usage will see their setting respected only when column parallelism is disabled (few columns / small RGs), but silently overridden with DEFAULT_BATCH_SIZE on the hot path.
Both functions should accept a batch_size: usize parameter (or Option<usize>) and thread it through with_batch_size(...):
// In decode_rg_column_async signature:
async fn decode_rg_column_async(
...
batch_size: usize,
) -> DaftResult<arrow::array::RecordBatch> {
...
builder = builder.with_batch_size(batch_size);
}
// In decode_rg_predicate_phase_async signature:
async fn decode_rg_predicate_phase_async(
...
batch_size: usize,
) -> DaftResult<(arrow::array::RecordBatch, RowSelection)> {
...
builder = builder.with_batch_size(batch_size);
}And thread batch_size.unwrap_or(DEFAULT_BATCH_SIZE) through all call sites in read_parquet_single_arrowrs and stream_parquet_single_arrowrs.
| let rg_rows = setup.parquet_metadata.row_group(task.rg_idx).num_rows() as usize; | ||
| let mut sel = build_base_row_selection(&setup, task, rg_rows); | ||
| if !setup.predicate_pushed && task.local_offset > 0 { | ||
| let rg_rows = setup.parquet_metadata.row_group(task.rg_idx).num_rows() as usize; | ||
| let offset_sel = build_offset_row_selection(task.local_offset, rg_rows); |
There was a problem hiding this comment.
rg_rows is computed on line 1922 and then recomputed with an identical expression on line 1925 inside the inner if block. The inner declaration shadows the outer one unnecessarily.
| let rg_rows = setup.parquet_metadata.row_group(task.rg_idx).num_rows() as usize; | |
| let mut sel = build_base_row_selection(&setup, task, rg_rows); | |
| if !setup.predicate_pushed && task.local_offset > 0 { | |
| let rg_rows = setup.parquet_metadata.row_group(task.rg_idx).num_rows() as usize; | |
| let offset_sel = build_offset_row_selection(task.local_offset, rg_rows); | |
| let rg_rows = setup.parquet_metadata.row_group(task.rg_idx).num_rows() as usize; | |
| let mut sel = build_base_row_selection(&setup, task, rg_rows); | |
| if !setup.predicate_pushed && task.local_offset > 0 { | |
| let offset_sel = build_offset_row_selection(task.local_offset, rg_rows); | |
| sel = combine_selections(sel, Some(offset_sel)); |
| if predicate.is_some() | ||
| && let Some(limit) = num_rows | ||
| { | ||
| table = table.head(limit)?; | ||
| } | ||
| if predicate.is_none() | ||
| && let Some(limit) = num_rows | ||
| { | ||
| table = table.head(limit)?; | ||
| } |
There was a problem hiding this comment.
Redundant mutually-exclusive limit checks
The two if branches are mutually exclusive (predicate.is_some() vs predicate.is_none()) and together they always apply table.head(limit)?. They can be collapsed into a single unconditional check. The same pattern appears in local_parquet_read_arrowrs at lines 1973–1981.
| if predicate.is_some() | |
| && let Some(limit) = num_rows | |
| { | |
| table = table.head(limit)?; | |
| } | |
| if predicate.is_none() | |
| && let Some(limit) = num_rows | |
| { | |
| table = table.head(limit)?; | |
| } | |
| if let Some(limit) = num_rows { | |
| table = table.head(limit)?; | |
| } | |
| Ok(table) |
- Remove redundant rg_rows shadow variable - Collapse mutually-exclusive limit checks into single unconditional check - Remove unused has_predicate variable Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@greptile |
| let offset_sel = if use_offset_selection || (!predicate_pushed && global_start > 0) | ||
| { | ||
| let local_offset = global_start.saturating_sub(rg_start_in_stream); | ||
| if local_offset > 0 { | ||
| Some(build_offset_row_selection(local_offset, rg_rows)) | ||
| } else { | ||
| None | ||
| } | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| let delete_sel = if let Some(deletes) = delete_rows | ||
| && !deletes.is_empty() | ||
| { | ||
| Some(build_single_rg_delete_selection( | ||
| deletes, | ||
| rg_global_starts[rg_idx], | ||
| rg_rows, | ||
| )) | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| let offset_selection = if use_offset_selection { | ||
| Some(build_offset_row_selection( | ||
| start_offset.unwrap(), | ||
| total_selected_rows, | ||
| )) | ||
| } else { | ||
| None | ||
| }; | ||
| combine_selections(offset_sel, delete_sel) |
There was a problem hiding this comment.
Offset + delete semantics diverge between fallback and column-parallel paths
When !predicate_pushed, start_offset > 0, and delete_rows are present, the fallback path and the column-parallel path produce different results.
Fallback path (all_col_indices.len() < MIN_COLS_FOR_COL_PARALLELISM): applies with_offset(start_offset) after with_row_selection(delete_sel). Because arrow-rs applies with_offset over the post-selection row stream, this skips the first N non-deleted rows.
Column-parallel path (here): combine_selections(offset_sel, delete_sel) produces an intersection — rows that are both at position ≥ N AND not deleted. This skips the first N rows by physical position regardless of deletion status.
Concrete example: rows 0–9, delete_row=2, start_offset=3.
- Fallback: non-deleted rows are {0,1,3,4,5,6,7,8,9}; skip first 3 → output starts at row 4.
- Column-parallel: rows ≥ 3 AND not deleted = {3,4,5,6,7,8,9}; output starts at row 3.
This is a behavioral regression for users who rely on Iceberg positional deletes combined with a non-predicate-pushed scan with an offset. The same inconsistency exists in stream_parquet_single_arrowrs at line 2202.
| // Non-pushed offset for predicate phase. | ||
| if !setup.predicate_pushed { | ||
| builder = builder.with_offset(task.local_offset); | ||
| } |
There was a problem hiding this comment.
Dead code branch in
decode_rg_predicate_phase
decode_rg_predicate_phase is only ever called when setup.predicate_pushed == true (in both decode_single_rg_col_parallel and local_parquet_read_arrowrs, the call sites are inside if setup.predicate_pushed { ... }). The if !setup.predicate_pushed branch at line 545 is therefore unreachable dead code. More importantly, it is also misleading — it suggests the function is designed for use in the non-pushed case, which could cause a correctness problem if a future caller invokes it without pushdown (the offset would be double-applied: once via build_base_row_selection for the delete selection, and again via with_offset).
The branch should be removed to accurately reflect the function's invariant, or the function should be documented with a // Only called when predicate_pushed == true assertion.
| // Non-pushed offset for predicate phase. | |
| if !setup.predicate_pushed { | |
| builder = builder.with_offset(task.local_offset); | |
| } | |
| // Apply base row selection (offset + deletes). | |
| let base_selection = build_base_row_selection(setup, task, rg_rows); | |
| if let Some(ref sel) = base_selection { | |
| builder = builder.with_row_selection(sel.clone()); | |
| } | |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6423 +/- ##
==========================================
- Coverage 74.81% 74.44% -0.38%
==========================================
Files 1021 1021
Lines 136570 137547 +977
==========================================
+ Hits 102172 102394 +222
- Misses 34398 35153 +755
🚀 New features to boost your workflow:
|
…dead code Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
desmondcheongzx
left a comment
There was a problem hiding this comment.
@greptileai review
The arrow-rs parquet reader parallelizes across row groups but decodes all columns serially within each RG. For wide tables with few row groups, this leaves CPU cores idle. This PR adds intra-RG column parallelism by opening separate per-column readers with
ProjectionMask::roots, decoding independently, and hconcating results.All four read paths are modified:
local_parquet_read_arrowrs(sync bulk)par_iterover(RG, col)pairslocal_parquet_stream_arrowrs(sync stream)read_parquet_single_arrowrs(async bulk)try_join_allover(RG, col)async tasksstream_parquet_single_arrowrs(async stream)When a predicate is pushed, decode uses a two-phase approach: predicate columns are decoded first (serial per RG) to compute a
RowSelection, then data columns are decoded in parallel using the refined selection. Results are reassembled viahconcat_record_batches.Column parallelism is gated by
MIN_COLS_FOR_COL_PARALLELISM(3 columns) andMIN_RG_BYTES_FOR_COL_PARALLELISM(16 MiB uncompressed), falling back to the single-reader path when overhead would exceed benefit. Single-column reads use the existingdecode_single_rgpath with no overhead.Shared helpers added:
bool_array_to_row_selection,refine_selection,hconcat_record_batches,build_base_row_selection,compute_root_indices, and sync/async per-column decode functions.This is part 1 of #6353, split for reviewability. Part 2 (S3 byte range prefetching + async scheduling) builds on this.