Skip to content

Commit 972dbd0

Browse files
committed
feat: Improve fetch partition performance, support skip validation arrow ipc files
1 parent f6b159e commit 972dbd0

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,16 @@ fn fetch_partition_local_inner(
441441
BallistaError::General(format!("Failed to open partition file at {path}: {e:?}"))
442442
})?;
443443
let file = BufReader::new(file);
444-
let reader = StreamReader::try_new(file, None).map_err(|e| {
445-
BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}"))
446-
})?;
444+
// Safety: The ShuffleWriter partition file is valid
445+
let reader = unsafe {
446+
StreamReader::try_new(file, None)
447+
.map_err(|e| {
448+
BallistaError::General(format!(
449+
"Failed to new arrow FileReader at {path}: {e:?}"
450+
))
451+
})?
452+
.with_skip_validation(true)
453+
};
447454
Ok(reader)
448455
}
449456

ballista/executor/src/flight_service.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,12 @@ impl FlightService for BallistaFlightService {
9595
})
9696
.map_err(|e| from_ballista_err(&e))?;
9797
let file = BufReader::new(file);
98-
let reader =
99-
StreamReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?;
100-
98+
// Safety: The ShuffleWriter partition file is valid
99+
let reader = unsafe {
100+
StreamReader::try_new(file, None)
101+
.map_err(|e| from_arrow_err(&e))?
102+
.with_skip_validation(true)
103+
};
101104
let (tx, rx) = channel(2);
102105
let schema = reader.schema();
103106
task::spawn_blocking(move || {

0 commit comments

Comments
 (0)