Skip to content

Commit 7a7802c

Browse files
committed
feat: add config option for skipping arrow ipc read validation
1 parent 60182c8 commit 7a7802c

File tree

7 files changed

+210
-100
lines changed

7 files changed

+210
-100
lines changed

ballista/client/src/extension.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl SessionContextExt for SessionContext {
9393
async fn remote_with_state(
9494
url: &str,
9595
state: SessionState,
96-
) -> datafusion::error::Result<SessionContext> {
96+
) -> datafusion::error::Result<Self> {
9797
let scheduler_url = Extension::parse_url(url)?;
9898
log::info!(
9999
"Connecting to Ballista scheduler at {}",
@@ -110,7 +110,7 @@ impl SessionContextExt for SessionContext {
110110
Ok(SessionContext::new_with_state(session_state))
111111
}
112112

113-
async fn remote(url: &str) -> datafusion::error::Result<SessionContext> {
113+
async fn remote(url: &str) -> datafusion::error::Result<Self> {
114114
let scheduler_url = Extension::parse_url(url)?;
115115
log::info!(
116116
"Connecting to Ballista scheduler at: {}",
@@ -129,7 +129,9 @@ impl SessionContextExt for SessionContext {
129129
#[cfg(feature = "standalone")]
130130
async fn standalone_with_state(
131131
state: SessionState,
132-
) -> datafusion::error::Result<SessionContext> {
132+
) -> datafusion::error::Result<Self> {
133+
log::info!("Running in local mode. Scheduler will be run in-proc");
134+
133135
let scheduler_url = Extension::setup_standalone(Some(&state)).await?;
134136

135137
let session_state = state.upgrade_for_ballista(scheduler_url)?;

ballista/core/src/config.rs

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,32 +31,43 @@ use datafusion::{
3131
pub const BALLISTA_JOB_NAME: &str = "ballista.job.name";
3232
/// Configuration key for standalone processing parallelism.
3333
pub const BALLISTA_STANDALONE_PARALLELISM: &str = "ballista.standalone.parallelism";
34+
35+
// Arrow IPC configuration
36+
37+
/// Configuration key for skipping redundant reader validation of Arrow IPC valid data.
38+
pub const BALLISTA_ARROW_IPC_READER_SKIP_VALIDATION: &str =
39+
"ballista.arrow.ipc.reader_skip_validation";
40+
41+
// gRPC configuration
42+
43+
/// Configuration key for gRPC client connection timeout in seconds.
44+
pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
45+
"ballista.grpc.client.connect_timeout_seconds";
46+
/// Configuration key for HTTP/2 keep-alive interval for gRPC clients in seconds.
47+
pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
48+
"ballista.grpc.client.http2_keepalive_interval_seconds";
3449
/// max message size for gRPC clients
3550
pub const BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE: &str =
3651
"ballista.grpc_client_max_message_size";
37-
/// Configuration key for maximum concurrent shuffle read requests.
38-
pub const BALLISTA_SHUFFLE_READER_MAX_REQUESTS: &str =
39-
"ballista.shuffle.max_concurrent_read_requests";
52+
/// Configuration key for TCP keep-alive interval for gRPC clients in seconds.
53+
pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
54+
"ballista.grpc.client.tcp_keepalive_seconds";
55+
/// Configuration key for gRPC client request timeout in seconds.
56+
pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
57+
"ballista.grpc.client.timeout_seconds";
58+
59+
// Shuffle reader configuration
60+
4061
/// Configuration key to force remote reads even for local partitions.
4162
pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
4263
"ballista.shuffle.force_remote_read";
64+
/// Configuration key for maximum concurrent shuffle read requests.
65+
pub const BALLISTA_SHUFFLE_READER_MAX_REQUESTS: &str =
66+
"ballista.shuffle.max_concurrent_read_requests";
4367
/// Configuration key to prefer Flight protocol for remote shuffle reads.
4468
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
4569
"ballista.shuffle.remote_read_prefer_flight";
4670

47-
/// Configuration key for gRPC client connection timeout in seconds.
48-
pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
49-
"ballista.grpc.client.connect_timeout_seconds";
50-
/// Configuration key for gRPC client request timeout in seconds.
51-
pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
52-
"ballista.grpc.client.timeout_seconds";
53-
/// Configuration key for TCP keep-alive interval for gRPC clients in seconds.
54-
pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
55-
"ballista.grpc.client.tcp_keepalive_seconds";
56-
/// Configuration key for HTTP/2 keep-alive interval for gRPC clients in seconds.
57-
pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
58-
"ballista.grpc.client.http2_keepalive_interval_seconds";
59-
6071
/// Result type for configuration parsing operations.
6172
pub type ParseResult<T> = result::Result<T, String>;
6273
use std::sync::LazyLock;
@@ -69,38 +80,49 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
6980
ConfigEntry::new(BALLISTA_STANDALONE_PARALLELISM.to_string(),
7081
"Standalone processing parallelism ".to_string(),
7182
DataType::UInt16, Some(std::thread::available_parallelism().map(|v| v.get()).unwrap_or(1).to_string())),
83+
84+
// Arrow IPC configuration
85+
ConfigEntry::new(BALLISTA_ARROW_IPC_READER_SKIP_VALIDATION.to_string(),
86+
"Skip redundant reader validation of Arrow IPC valid data".to_string(),
87+
DataType::Boolean,
88+
Some(true.to_string())),
89+
90+
// gRPC configuration
91+
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
92+
"Connection timeout for gRPC client in seconds".to_string(),
93+
DataType::UInt64,
94+
Some((20).to_string())),
95+
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
96+
"HTTP/2 keep-alive interval for gRPC client in seconds".to_string(),
97+
DataType::UInt64,
98+
Some((300).to_string())),
7299
ConfigEntry::new(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE.to_string(),
73100
"Configuration for max message size in gRPC clients".to_string(),
74101
DataType::UInt64,
75102
Some((16 * 1024 * 1024).to_string())),
76-
ConfigEntry::new(BALLISTA_SHUFFLE_READER_MAX_REQUESTS.to_string(),
77-
"Maximum concurrent requests shuffle reader can process".to_string(),
103+
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS.to_string(),
104+
"TCP keep-alive interval for gRPC client in seconds".to_string(),
78105
DataType::UInt64,
79-
Some((64).to_string())),
106+
Some((3600).to_string())),
107+
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS.to_string(),
108+
"Request timeout for gRPC client in seconds".to_string(),
109+
DataType::UInt64,
110+
Some((20).to_string())),
111+
112+
113+
// Shuffle reader configuration
80114
ConfigEntry::new(BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ.to_string(),
81115
"Forces the shuffle reader to always read partitions via the Arrow Flight client, even when partitions are local to the node.".to_string(),
82116
DataType::Boolean,
83117
Some((false).to_string())),
118+
ConfigEntry::new(BALLISTA_SHUFFLE_READER_MAX_REQUESTS.to_string(),
119+
"Maximum concurrent requests shuffle reader can process".to_string(),
120+
DataType::UInt64,
121+
Some((64).to_string())),
84122
ConfigEntry::new(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT.to_string(),
85123
"Forces the shuffle reader to use flight reader instead of block reader for remote read. Block reader usually has better performance and resource utilization".to_string(),
86124
DataType::Boolean,
87125
Some((false).to_string())),
88-
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
89-
"Connection timeout for gRPC client in seconds".to_string(),
90-
DataType::UInt64,
91-
Some((20).to_string())),
92-
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS.to_string(),
93-
"Request timeout for gRPC client in seconds".to_string(),
94-
DataType::UInt64,
95-
Some((20).to_string())),
96-
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS.to_string(),
97-
"TCP keep-alive interval for gRPC client in seconds".to_string(),
98-
DataType::UInt64,
99-
Some((3600).to_string())),
100-
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
101-
"HTTP/2 keep-alive interval for gRPC client in seconds".to_string(),
102-
DataType::UInt64,
103-
Some((300).to_string()))
104126
];
105127
entries
106128
.into_iter()
@@ -264,6 +286,11 @@ impl BallistaConfig {
264286
self.get_bool_setting(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT)
265287
}
266288

289+
/// Allows skipping redundant validation of arrow IPC valid data.
290+
pub fn ballista_arrow_ipc_reader_skip_validation(&self) -> bool {
291+
self.get_bool_setting(BALLISTA_ARROW_IPC_READER_SKIP_VALIDATION)
292+
}
293+
267294
fn get_usize_setting(&self, key: &str) -> usize {
268295
if let Some(v) = self.settings.get(key) {
269296
// infallible because we validate all configs in the constructor

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ impl ExecutionPlan for ShuffleReaderExec {
162162
let max_message_size = config.ballista_grpc_client_max_message_size();
163163
let force_remote_read = config.ballista_shuffle_reader_force_remote_read();
164164
let prefer_flight = config.ballista_shuffle_reader_remote_prefer_flight();
165+
let arrow_ipc_reader_skip_validation =
166+
config.ballista_arrow_ipc_reader_skip_validation();
165167

166168
if force_remote_read {
167169
debug!(
@@ -195,6 +197,7 @@ impl ExecutionPlan for ShuffleReaderExec {
195197
max_message_size,
196198
force_remote_read,
197199
prefer_flight,
200+
arrow_ipc_reader_skip_validation,
198201
);
199202

200203
let result = RecordBatchStreamAdapter::new(
@@ -390,6 +393,7 @@ fn send_fetch_partitions(
390393
max_message_size: usize,
391394
force_remote_read: bool,
392395
flight_transport: bool,
396+
arrow_ipc_reader_skip_validation: bool,
393397
) -> AbortableReceiverStream {
394398
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
395399
let semaphore = Arc::new(Semaphore::new(max_request_num));
@@ -409,7 +413,12 @@ fn send_fetch_partitions(
409413
spawned_tasks.push(SpawnedTask::spawn(async move {
410414
for p in local_locations {
411415
let r = PartitionReaderEnum::Local
412-
.fetch_partition(&p, max_message_size, flight_transport)
416+
.fetch_partition(
417+
&p,
418+
max_message_size,
419+
flight_transport,
420+
arrow_ipc_reader_skip_validation,
421+
)
413422
.await;
414423
if let Err(e) = response_sender_c.send(r).await {
415424
error!("Fail to send response event to the channel due to {e}");
@@ -424,7 +433,12 @@ fn send_fetch_partitions(
424433
// Block if exceeds max request number.
425434
let permit = semaphore.acquire_owned().await.unwrap();
426435
let r = PartitionReaderEnum::FlightRemote
427-
.fetch_partition(&p, max_message_size, flight_transport)
436+
.fetch_partition(
437+
&p,
438+
max_message_size,
439+
flight_transport,
440+
arrow_ipc_reader_skip_validation,
441+
)
428442
.await;
429443
// Block if the channel buffer is full.
430444
if let Err(e) = response_sender.send(r).await {
@@ -451,6 +465,7 @@ trait PartitionReader: Send + Sync + Clone {
451465
location: &PartitionLocation,
452466
max_message_size: usize,
453467
flight_transport: bool,
468+
arrow_ipc_reader_skip_validation: bool,
454469
) -> result::Result<SendableRecordBatchStream, BallistaError>;
455470
}
456471

@@ -470,12 +485,15 @@ impl PartitionReader for PartitionReaderEnum {
470485
location: &PartitionLocation,
471486
max_message_size: usize,
472487
flight_transport: bool,
488+
arrow_ipc_reader_skip_validation: bool,
473489
) -> result::Result<SendableRecordBatchStream, BallistaError> {
474490
match self {
475491
PartitionReaderEnum::FlightRemote => {
476492
fetch_partition_remote(location, max_message_size, flight_transport).await
477493
}
478-
PartitionReaderEnum::Local => fetch_partition_local(location).await,
494+
PartitionReaderEnum::Local => {
495+
fetch_partition_local(location, arrow_ipc_reader_skip_validation).await
496+
}
479497
PartitionReaderEnum::ObjectStoreRemote => {
480498
fetch_partition_object_store(location).await
481499
}
@@ -521,33 +539,44 @@ async fn fetch_partition_remote(
521539

522540
async fn fetch_partition_local(
523541
location: &PartitionLocation,
542+
arrow_ipc_reader_skip_validation: bool,
524543
) -> result::Result<SendableRecordBatchStream, BallistaError> {
525544
let path = &location.path;
526545
let metadata = &location.executor_meta;
527546
let partition_id = &location.partition_id;
528547

529-
let reader = fetch_partition_local_inner(path).map_err(|e| {
530-
// return BallistaError::FetchFailed may let scheduler retry this task.
531-
BallistaError::FetchFailed(
532-
metadata.id.clone(),
533-
partition_id.stage_id,
534-
partition_id.partition_id,
535-
e.to_string(),
536-
)
537-
})?;
548+
let reader = fetch_partition_local_inner(path, arrow_ipc_reader_skip_validation)
549+
.map_err(|e| {
550+
// return BallistaError::FetchFailed may let scheduler retry this task.
551+
BallistaError::FetchFailed(
552+
metadata.id.clone(),
553+
partition_id.stage_id,
554+
partition_id.partition_id,
555+
e.to_string(),
556+
)
557+
})?;
538558
Ok(Box::pin(LocalShuffleStream::new(reader)))
539559
}
540560

541561
fn fetch_partition_local_inner(
542562
path: &str,
563+
arrow_ipc_reader_skip_validation: bool,
543564
) -> result::Result<StreamReader<BufReader<File>>, BallistaError> {
544565
let file = File::open(path).map_err(|e| {
545566
BallistaError::General(format!("Failed to open partition file at {path}: {e:?}"))
546567
})?;
547568
let file = BufReader::new(file);
548-
let reader = StreamReader::try_new(file, None).map_err(|e| {
549-
BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}"))
550-
})?;
569+
// Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
570+
let reader = unsafe {
571+
StreamReader::try_new(file, None)
572+
.map_err(|e| {
573+
BallistaError::General(format!(
574+
"Failed to create new arrow StreamReader at {path}: {e:?}"
575+
))
576+
})?
577+
.with_skip_validation(arrow_ipc_reader_skip_validation)
578+
};
579+
551580
Ok(reader)
552581
}
553582

@@ -881,7 +910,7 @@ mod tests {
881910

882911
// from to input partitions test the first one with two batches
883912
let file_path = path.value(0);
884-
let reader = fetch_partition_local_inner(file_path).unwrap();
913+
let reader = fetch_partition_local_inner(file_path, true).unwrap();
885914

886915
let mut stream: Pin<Box<dyn RecordBatchStream + Send>> =
887916
async { Box::pin(LocalShuffleStream::new(reader)) }.await;
@@ -951,6 +980,7 @@ mod tests {
951980
4 * 1024 * 1024,
952981
false,
953982
true,
983+
true,
954984
);
955985

956986
let stream = RecordBatchStreamAdapter::new(

0 commit comments

Comments
 (0)