Skip to content

Commit cd2953d

Browse files
refactor(arrow2): remove parquet2 from daft (#6339)
Remove all parquet2 usage from daft crates, the only remaining usage is now in arrow2 itself. The arrow-rs reader now handles all parquet decoding, and the metadata adapter already wraps arrow-rs types, so parquet2 is fully redundant. However, we still can't remove `#![allow(deprecated, reason = "arrow2 migration")]` because daft-parquet still uses daft_arrow::array::Array, daft_arrow::datatypes::Schema, etc. for the pyarrow FFI bridge. The pyarrow export path (`read_parquet_single_into_arrow`) is rewritten to call the arrow-rs `read_parquet_single()` and convert the resulting `RecordBatch` to arrow2 chunks, replacing the parquet2-based `ParquetReaderBuilder`/`stream_reader` pipeline. For `StringEncoding::Raw` (reading invalid UTF-8), the parquet metadata is modified to strip STRING/UTF8 logical types before the arrow-rs reader sees them, so BYTE_ARRAY columns are decoded as Binary without UTF-8 validation. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8cc802b commit cd2953d

File tree

22 files changed

+584
-2060
lines changed

22 files changed

+584
-2060
lines changed

Cargo.lock

Lines changed: 0 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/error/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ serde_json = {workspace = true}
77
thiserror = {workspace = true}
88
tokio = {workspace = true}
99

10-
[dev-dependencies]
11-
parquet2 = {workspace = true}
12-
1310
[features]
1411
python = ["dep:pyo3"]
1512

src/common/error/src/error.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -127,21 +127,4 @@ mod tests {
127127
_ => panic!("Expected ByteStreamError"),
128128
}
129129
}
130-
131-
#[test]
132-
fn test_parquet_io_error_conversion() {
133-
// Ensure that parquet2 IO errors get converted into transient Byte Stream errors.
134-
let error_message = "IO error occurred";
135-
let parquet_io_error =
136-
parquet2::error::Error::IoError(std::io::Error::other(error_message));
137-
let arrow_error: daft_arrow::error::Error = parquet_io_error.into();
138-
//let arrow_error = daft_arrow::error::Error::from(parquet_io_error);
139-
let daft_error: DaftError = arrow_error.into();
140-
match daft_error {
141-
DaftError::ByteStreamError(e) => {
142-
assert_eq!(e.to_string(), format!("Io error: {error_message}"));
143-
}
144-
_ => panic!("Expected ByteStreamError"),
145-
}
146-
}
147130
}

src/daft-micropartition/src/micropartition.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use daft_dsl::{AggExpr, Expr, ExprRef};
1515
use daft_io::{IOClient, IOConfig, IOStatsRef};
1616
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
1717
use daft_parquet::{
18-
DaftParquetMetadata, infer_arrow_schema_from_metadata,
18+
DaftParquetMetadata,
1919
read::{ParquetSchemaInferenceOptions, read_parquet_bulk, read_parquet_metadata_bulk},
2020
};
2121
use daft_recordbatch::RecordBatch;
@@ -641,11 +641,8 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
641641
let schemas = metadata
642642
.iter()
643643
.map(|m| {
644-
let schema = infer_arrow_schema_from_metadata(
645-
m.as_parquet2(),
646-
Some((*schema_infer_options).into()),
647-
)?;
648-
let daft_schema = Schema::from(schema);
644+
let daft_schema =
645+
daft_parquet::infer_schema_from_daft_metadata(m, *schema_infer_options)?;
649646
DaftResult::Ok(Arc::new(daft_schema))
650647
})
651648
.collect::<DaftResult<Vec<_>>>()?;
@@ -668,11 +665,8 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
668665
let schemas = metadata
669666
.iter()
670667
.map(|m| {
671-
let schema = infer_arrow_schema_from_metadata(
672-
m.as_parquet2(),
673-
Some((*schema_infer_options).into()),
674-
)?;
675-
let daft_schema = schema.into();
668+
let daft_schema =
669+
daft_parquet::infer_schema_from_daft_metadata(m, *schema_infer_options)?;
676670
DaftResult::Ok(Arc::new(daft_schema))
677671
})
678672
.collect::<DaftResult<Vec<_>>>()?;

src/daft-parquet/Cargo.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ name = "parquet_read"
44

55
[dependencies]
66
arrow = {workspace = true}
7-
async-compat = {workspace = true}
87
daft-arrow = {path = "../daft-arrow"}
9-
async-stream = {workspace = true}
108
bytes = {workspace = true}
119
common-error = {path = "../common/error", default-features = false}
1210
common-runtime = {path = "../common/runtime", default-features = false}
@@ -17,17 +15,13 @@ daft-recordbatch = {path = "../daft-recordbatch", default-features = false}
1715
daft-stats = {path = "../daft-stats", default-features = false}
1816
futures = {workspace = true}
1917
indexmap = {workspace = true}
20-
itertools = {workspace = true}
21-
log = {workspace = true}
2218
parquet = {workspace = true, features = ["async"]}
23-
parquet2 = {workspace = true}
2419
pyo3 = {workspace = true, optional = true}
2520
rayon = {workspace = true}
2621
serde = {workspace = true}
2722
snafu = {workspace = true}
2823
tokio = {workspace = true}
2924
tokio-stream = {workspace = true}
30-
tokio-util = {workspace = true}
3125

3226
[dev-dependencies]
3327
bincode = {workspace = true}

src/daft-parquet/src/arrowrs_reader.rs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
//! Arrow-rs based parquet reader.
1+
//! Parquet reader built on the arrow-rs `parquet` crate.
22
//!
3-
//! This module provides a parquet reader built on the arrow-rs `parquet` crate,
4-
//! replacing the parquet2/arrow2 decode pipeline. It uses [`DaftAsyncFileReader`]
5-
//! as the IO bridge for remote reads, and the sync `ParquetRecordBatchReaderBuilder`
6-
//! with `std::fs::File` for local reads (avoiding IOClient overhead).
3+
//! Uses [`DaftAsyncFileReader`] as the IO bridge for remote reads, and the sync
4+
//! `ParquetRecordBatchReaderBuilder` with `std::fs::File` for local reads.
75
86
use std::{
97
borrow::Borrow,
@@ -37,10 +35,12 @@ use tokio_stream::wrappers::ReceiverStream;
3735

3836
use crate::{
3937
async_reader::DaftAsyncFileReader,
40-
metadata::apply_field_ids_to_arrowrs_parquet_metadata,
41-
read::ParquetSchemaInferenceOptions,
38+
metadata::{
39+
apply_field_ids_to_arrowrs_parquet_metadata, strip_string_types_from_parquet_metadata,
40+
},
41+
read::{ParquetSchemaInferenceOptions, StringEncoding},
4242
schema_inference::{arrow_schema_to_daft_schema, infer_schema_from_parquet_metadata_arrowrs},
43-
statistics::arrowrs_row_group_metadata_to_table_stats,
43+
statistics::row_group_metadata_to_table_stats,
4444
};
4545

4646
/// Default batch size for the arrow-rs reader (number of rows per batch).
@@ -89,8 +89,7 @@ fn infer_schemas(
8989
let arrow_schema = infer_schema_from_parquet_metadata_arrowrs(
9090
parquet_metadata,
9191
Some(schema_infer_options.coerce_int96_timestamp_unit),
92-
schema_infer_options.string_encoding
93-
== daft_arrow::io::parquet::read::schema::StringEncoding::Raw,
92+
schema_infer_options.string_encoding == StringEncoding::Raw,
9493
)
9594
.map_err(parquet_err)?;
9695
let daft_schema = arrow_schema_to_daft_schema(&arrow_schema)?;
@@ -325,9 +324,8 @@ fn deletes_to_row_selection(local_deletes: &[usize], total_rows: usize) -> RowSe
325324
selectors.into()
326325
}
327326

328-
/// Read a single parquet file into a Daft [`RecordBatch`] using the arrow-rs reader.
327+
/// Read a single parquet file into a Daft [`RecordBatch`].
329328
///
330-
/// This is the arrow-rs equivalent of the parquet2-based `read_parquet_single`.
331329
/// When `predicate` and/or `delete_rows` are provided, the reader handles them
332330
/// internally using arrow-rs `RowFilter` and `RowSelection` for late materialization.
333331
///
@@ -339,10 +337,9 @@ fn deletes_to_row_selection(local_deletes: &[usize], total_rows: usize) -> RowSe
339337
/// offset (skip file rows) → predicate filter → limit
340338
///
341339
/// Note: `start_offset > 0` is rejected by the micropartition reader and never used
342-
/// in production (the streaming scan path doesn't even accept the parameter). The
343-
/// parquet2 reader has latent bugs for this case — both its local and remote paths
344-
/// produce RecordBatch size mismatches when `start_offset > 0`. Our implementation
345-
/// follows the intended semantics based on the code structure and the `apply_delete_rows`
340+
/// in production (the streaming scan path doesn't even accept the parameter). Our
341+
/// implementation follows the intended semantics based on the code structure and the
342+
/// `apply_delete_rows`
346343
/// docstring in `read.rs`, but there is no working reference implementation to compare
347344
/// against.
348345
#[allow(clippy::too_many_arguments)]
@@ -370,6 +367,13 @@ pub async fn read_parquet_single_arrowrs(
370367
parquet_metadata = apply_field_ids_to_arrowrs_parquet_metadata(parquet_metadata, mapping)?;
371368
}
372369

370+
// 1c. For StringEncoding::Raw, strip STRING/UTF8 logical types from the parquet
371+
// metadata so arrow-rs infers Binary instead of Utf8. This avoids UTF-8
372+
// validation during decode, allowing files with invalid UTF-8 to be read.
373+
if schema_infer_options.string_encoding == StringEncoding::Raw {
374+
parquet_metadata = strip_string_types_from_parquet_metadata(parquet_metadata)?;
375+
}
376+
373377
// 2. Infer schema with Daft options (INT96 coercion, string encoding).
374378
let (arrow_schema, daft_schema) = infer_schemas(&parquet_metadata, &schema_infer_options)?;
375379

@@ -672,6 +676,12 @@ pub(crate) fn local_parquet_setup(
672676
parquet_metadata = apply_field_ids_to_arrowrs_parquet_metadata(parquet_metadata, mapping)?;
673677
}
674678

679+
// 1c. For StringEncoding::Raw, strip STRING/UTF8 logical types so arrow-rs
680+
// reads BYTE_ARRAY as Binary (no UTF-8 validation).
681+
if schema_infer_options.string_encoding == StringEncoding::Raw {
682+
parquet_metadata = strip_string_types_from_parquet_metadata(parquet_metadata)?;
683+
}
684+
675685
// 2. Infer schema with Daft options.
676686
let (arrow_schema, daft_schema) = infer_schemas(&parquet_metadata, &schema_infer_options)?;
677687

@@ -876,8 +886,7 @@ pub(crate) fn decode_single_rg(
876886
///
877887
/// This avoids the overhead of `DaftAsyncFileReader` + `IOClient` for local files
878888
/// by using `std::fs::File` directly with `ParquetRecordBatchReaderBuilder`.
879-
/// Row groups are decoded in parallel using rayon, matching the parquet2 reader's
880-
/// parallelism strategy. Supports late materialization via `RowFilter` and
889+
/// Row groups are decoded in parallel using rayon. Supports late materialization via `RowFilter` and
881890
/// positional delete skipping via `RowSelection`.
882891
///
883892
/// See [`read_parquet_single_arrowrs`] for `start_offset` semantics.
@@ -964,8 +973,8 @@ pub fn local_parquet_read_arrowrs(
964973
/// Stream a local parquet file as Daft [`RecordBatch`]es using the sync arrow-rs reader,
965974
/// dispatching per-row-group decode as async tasks on the compute runtime.
966975
///
967-
/// Matches parquet2's `local_parquet_stream` pattern: sync metadata read, then
968-
/// per-RG tasks on the DAFTCPU pool with semaphore-gated parallelism.
976+
/// Performs sync metadata read, then per-RG tasks on the DAFTCPU pool with
977+
/// semaphore-gated parallelism.
969978
#[allow(clippy::too_many_arguments)]
970979
pub async fn local_parquet_stream_arrowrs(
971980
path: &str,
@@ -1009,8 +1018,7 @@ pub async fn local_parquet_stream_arrowrs(
10091018
}
10101019

10111020
// 2. Semaphore: limit concurrent RG decodes.
1012-
// Unlike parquet2 (which spawns per-column tasks and divides by num_columns),
1013-
// arrowrs decodes all columns in a single block_in_place call per RG,
1021+
// All columns are decoded in a single block_in_place call per RG,
10141022
// so concurrency is limited only by available CPUs.
10151023
let num_cpus = std::thread::available_parallelism()
10161024
.map(|n| n.get())
@@ -1073,7 +1081,6 @@ pub async fn local_parquet_stream_arrowrs(
10731081

10741082
/// Stream a single parquet file as Daft [`RecordBatch`]es using the arrow-rs reader.
10751083
///
1076-
/// This is the arrow-rs equivalent of the parquet2-based `stream_parquet_single`.
10771084
/// Supports late materialization via `RowFilter` and positional delete skipping
10781085
/// via `RowSelection`.
10791086
#[allow(clippy::too_many_arguments)]
@@ -1101,6 +1108,12 @@ pub async fn stream_parquet_single_arrowrs(
11011108
parquet_metadata = apply_field_ids_to_arrowrs_parquet_metadata(parquet_metadata, mapping)?;
11021109
}
11031110

1111+
// 1c. For StringEncoding::Raw, strip STRING/UTF8 logical types so arrow-rs
1112+
// reads BYTE_ARRAY as Binary (no UTF-8 validation).
1113+
if schema_infer_options.string_encoding == StringEncoding::Raw {
1114+
parquet_metadata = strip_string_types_from_parquet_metadata(parquet_metadata)?;
1115+
}
1116+
11041117
// 2. Infer schema with Daft options.
11051118
let (arrow_schema, daft_schema) = infer_schemas(&parquet_metadata, &schema_infer_options)?;
11061119

@@ -1296,7 +1309,7 @@ fn prune_row_groups(
12961309
let mut result = Vec::with_capacity(candidates.len());
12971310
for rg_idx in candidates {
12981311
let rg_meta = metadata.row_group(rg_idx);
1299-
match arrowrs_row_group_metadata_to_table_stats(rg_meta, schema) {
1312+
match row_group_metadata_to_table_stats(rg_meta, schema) {
13001313
Ok(stats) => {
13011314
let evaled = stats.eval_expression(&bound_pred)?;
13021315
if evaled.to_truth_value() != TruthValue::False {

src/daft-parquet/src/async_reader.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ use parquet::{
1111

1212
use crate::read_planner::{CoalescePass, ReadPlanner, SplitLargeRequestPass};
1313

14-
// IO coalescing/splitting constants — these match the parquet2 reader in file.rs:384-391
15-
// so both paths have identical IO behavior.
14+
// IO coalescing/splitting constants for the read planner.
1615

1716
/// Maximum hole size for the coalesce pass (1 MB).
1817
/// Two byte ranges within this distance are merged into a single request,

0 commit comments

Comments
 (0)