Skip to content

Commit f30b81b

Browse files
authored
feat: add config option for skipping arrow ipc read validation (#1374)
* feat: add config option for skipping arrow ipc read validation * fix: remove unused dependencies
1 parent 08544f4 commit f30b81b

File tree

5 files changed

+21
-8
lines changed

5 files changed

+21
-8
lines changed

Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ rust-version = "1.88.0"
3333
arrow = { version = "57", features = ["ipc_compression"] }
3434
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
3535
clap = { version = "4.5", features = ["derive", "cargo"] }
36-
configure_me = { version = "0.4.0" }
37-
configure_me_codegen = { version = "0.4.4" }
3836
datafusion = "51.0.0"
3937
datafusion-cli = "51.0.0"
4038
datafusion-proto = "51.0.0"

ballista/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ exclude = ["*.proto"]
3535
rustc-args = ["--cfg", "docsrs"]
3636

3737
[features]
38+
arrow-ipc-optimizations = []
3839
build-binary = ["aws-config", "aws-credential-types", "clap", "object_store"]
40+
default = ["arrow-ipc-optimizations"]
3941
docsrs = []
4042
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
4143
force_hash_collisions = ["datafusion/force_hash_collisions"]

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -545,9 +545,17 @@ fn fetch_partition_local_inner(
545545
BallistaError::General(format!("Failed to open partition file at {path}: {e:?}"))
546546
})?;
547547
let file = BufReader::new(file);
548-
let reader = StreamReader::try_new(file, None).map_err(|e| {
549-
BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}"))
550-
})?;
548+
// Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
549+
let reader = unsafe {
550+
StreamReader::try_new(file, None)
551+
.map_err(|e| {
552+
BallistaError::General(format!(
553+
"Failed to create new arrow StreamReader at {path}: {e:?}"
554+
))
555+
})?
556+
.with_skip_validation(cfg!(feature = "arrow-ipc-optimizations"))
557+
};
558+
551559
Ok(reader)
552560
}
553561

ballista/executor/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ path = "src/bin/main.rs"
3333
required-features = ["build-binary"]
3434

3535
[features]
36+
arrow-ipc-optimizations = []
3637
build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"]
37-
default = ["build-binary", "mimalloc"]
38+
default = ["arrow-ipc-optimizations", "build-binary", "mimalloc"]
3839

3940
[dependencies]
4041
arrow = { workspace = true }

ballista/executor/src/flight_service.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,12 @@ impl FlightService for BallistaFlightService {
105105
})
106106
.map_err(|e| from_ballista_err(&e))?;
107107
let file = BufReader::new(file);
108-
let reader =
109-
StreamReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?;
108+
// Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
109+
let reader = unsafe {
110+
StreamReader::try_new(file, None)
111+
.map_err(|e| from_arrow_err(&e))?
112+
.with_skip_validation(cfg!(feature = "arrow-ipc-optimizations"))
113+
};
110114

111115
let (tx, rx) = channel(2);
112116
let schema = reader.schema();

0 commit comments

Comments
 (0)