Skip to content

Commit eeaab58

Browse files
committed
feat: Improve fetch partition performance, support skip validation arrow ipc files
1 parent c8f6423 commit eeaab58

File tree

13 files changed

+150
-17
lines changed

13 files changed

+150
-17
lines changed

ballista/client/tests/context_checks.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,42 @@ mod supported {
134134
Ok(())
135135
}
136136

137+
#[rstest]
138+
#[case::standalone(standalone_context())]
139+
#[case::remote(remote_context())]
140+
#[tokio::test]
141+
async fn should_execute_sql_show_config_ballista_shuffle_skip_validation(
142+
#[future(awt)]
143+
#[case]
144+
ctx: SessionContext,
145+
) -> datafusion::error::Result<()> {
146+
let state = ctx.state();
147+
let ballista_config_extension =
148+
state.config().options().extensions.get::<BallistaConfig>();
149+
150+
// ballista configuration should be registered with
151+
// session state
152+
assert!(ballista_config_extension.is_some());
153+
154+
let result = ctx
155+
.sql("select name, value from information_schema.df_settings where name = 'ballista.shuffle.skip_validation'")
156+
.await?
157+
.collect()
158+
.await?;
159+
160+
let expected = [
161+
"+----------------------------------+-------+",
162+
"| name | value |",
163+
"+----------------------------------+-------+",
164+
"| ballista.shuffle.skip_validation | true |",
165+
"+----------------------------------+-------+",
166+
];
167+
168+
assert_batches_eq!(expected, &result);
169+
170+
Ok(())
171+
}
172+
137173
#[rstest]
138174
#[case::standalone(standalone_context())]
139175
#[case::remote(remote_context())]

ballista/core/proto/ballista.proto

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ message FetchPartition {
205205
string path = 4;
206206
string host = 5;
207207
uint32 port = 6;
208+
bool skip_validation = 7;
208209
}
209210

210211
message PartitionLocation {
@@ -215,6 +216,7 @@ message PartitionLocation {
215216
ExecutorMetadata executor_meta = 3;
216217
PartitionStats partition_stats = 4;
217218
string path = 5;
219+
bool skip_validation = 6;
218220
}
219221

220222
// Unique identifier for a materialized partition of data
@@ -290,7 +292,7 @@ message ExecutorMetadata {
290292
}
291293

292294

293-
// Used for scheduler-executor
295+
// Used for scheduler-executor
294296
// communication
295297
message ExecutorRegistration {
296298
string id = 1;
@@ -523,7 +525,7 @@ message ExecuteQueryParams {
523525
bytes logical_plan = 1;
524526
string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql`
525527
}
526-
528+
527529
optional string session_id = 3;
528530
repeated KeyValuePair settings = 4;
529531
}

ballista/core/src/client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ impl BallistaClient {
8989
path: &str,
9090
host: &str,
9191
port: u16,
92+
skip_validation: bool,
9293
) -> Result<SendableRecordBatchStream> {
9394
let action = Action::FetchPartition {
9495
job_id: partition_id.job_id.clone(),
@@ -97,6 +98,7 @@ impl BallistaClient {
9798
path: path.to_owned(),
9899
host: host.to_owned(),
99100
port,
101+
skip_validation,
100102
};
101103
self.execute_action(&action)
102104
.await

ballista/core/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub const BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE: &str =
3434
"ballista.grpc_client_max_message_size";
3535
pub const BALLISTA_SHUFFLE_READER_MAX_REQUESTS: &str =
3636
"ballista.shuffle.max_concurrent_read_requests";
37+
pub const BALLISTA_SHUFFLE_READER_SKIP_VALIDATION: &str =
38+
"ballista.shuffle.skip_validation";
3739

3840
pub type ParseResult<T> = result::Result<T, String>;
3941
use std::sync::LazyLock;
@@ -54,6 +56,10 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
5456
"Maximum concurrent requests shuffle reader can process".to_string(),
5557
DataType::UInt64,
5658
Some((64).to_string())),
59+
ConfigEntry::new(BALLISTA_SHUFFLE_READER_SKIP_VALIDATION.to_string(),
60+
"Enable skip validation for Arrow IPC file when fetch partition".to_string(),
61+
DataType::Boolean,
62+
Some(true.to_string())),
5763
];
5864
entries
5965
.into_iter()
@@ -175,6 +181,10 @@ impl BallistaConfig {
175181
self.get_usize_setting(BALLISTA_SHUFFLE_READER_MAX_REQUESTS)
176182
}
177183

184+
pub fn shuffle_reader_skip_validation(&self) -> bool {
185+
self.get_bool_setting(BALLISTA_SHUFFLE_READER_SKIP_VALIDATION)
186+
}
187+
178188
fn get_usize_setting(&self, key: &str) -> usize {
179189
if let Some(v) = self.settings.get(key) {
180190
// infallible because we validate all configs in the constructor

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ async fn fetch_partition(
337337
let partition_id = location.partition_id.ok_or_else(|| {
338338
DataFusionError::Internal("Received empty partition id".to_owned())
339339
})?;
340+
let skip_validation = location.skip_validation;
340341
let host = metadata.host.as_str();
341342
let port = metadata.port as u16;
342343
let mut ballista_client = BallistaClient::try_new(host, port, max_message_size)
@@ -349,6 +350,7 @@ async fn fetch_partition(
349350
&location.path,
350351
host,
351352
port,
353+
skip_validation,
352354
)
353355
.await
354356
.map_err(|e| DataFusionError::External(Box::new(e)))

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,14 @@ impl ExecutionPlan for ShuffleReaderExec {
152152
let max_request_num =
153153
config.ballista_shuffle_reader_maximum_concurrent_requests();
154154
let max_message_size = config.ballista_grpc_client_max_message_size();
155+
let skip_validation = config.ballista_shuffle_reader_skip_validation();
155156

156157
log::debug!(
157-
"ShuffleReaderExec::execute({}) max_request_num: {}, max_message_size: {}",
158+
"ShuffleReaderExec::execute({}) max_request_num: {}, max_message_size: {}, skip_validation: {}",
158159
task_id,
159160
max_request_num,
160-
max_message_size
161+
max_message_size,
162+
skip_validation
161163
);
162164
let mut partition_locations = HashMap::new();
163165
for p in &self.partition[partition] {
@@ -175,6 +177,9 @@ impl ExecutionPlan for ShuffleReaderExec {
175177
.collect();
176178
// Shuffle partitions for evenly send fetching partition requests to avoid hot executors within multiple tasks
177179
partition_locations.shuffle(&mut rng());
180+
partition_locations
181+
.iter_mut()
182+
.for_each(|p| p.skip_validation = skip_validation);
178183

179184
let response_receiver =
180185
send_fetch_partitions(partition_locations, max_request_num, max_message_size);
@@ -393,6 +398,7 @@ async fn fetch_partition_remote(
393398
) -> result::Result<SendableRecordBatchStream, BallistaError> {
394399
let metadata = &location.executor_meta;
395400
let partition_id = &location.partition_id;
401+
let skip_validation = location.skip_validation;
396402
// TODO for shuffle client connections, we should avoid creating new connections again and again.
397403
// And we should also avoid to keep alive too many connections for long time.
398404
let host = metadata.host.as_str();
@@ -411,7 +417,14 @@ async fn fetch_partition_remote(
411417
})?;
412418

413419
ballista_client
414-
.fetch_partition(&metadata.id, partition_id, &location.path, host, port)
420+
.fetch_partition(
421+
&metadata.id,
422+
partition_id,
423+
&location.path,
424+
host,
425+
port,
426+
skip_validation,
427+
)
415428
.await
416429
}
417430

@@ -421,8 +434,9 @@ async fn fetch_partition_local(
421434
let path = &location.path;
422435
let metadata = &location.executor_meta;
423436
let partition_id = &location.partition_id;
437+
let skip_validation = location.skip_validation;
424438

425-
let reader = fetch_partition_local_inner(path).map_err(|e| {
439+
let reader = fetch_partition_local_inner(path, skip_validation).map_err(|e| {
426440
// return BallistaError::FetchFailed may let scheduler retry this task.
427441
BallistaError::FetchFailed(
428442
metadata.id.clone(),
@@ -436,14 +450,22 @@ async fn fetch_partition_local(
436450

437451
fn fetch_partition_local_inner(
438452
path: &str,
453+
skip_validation: bool,
439454
) -> result::Result<StreamReader<BufReader<File>>, BallistaError> {
440455
let file = File::open(path).map_err(|e| {
441456
BallistaError::General(format!("Failed to open partition file at {path}: {e:?}"))
442457
})?;
443458
let file = BufReader::new(file);
444-
let reader = StreamReader::try_new(file, None).map_err(|e| {
445-
BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}"))
446-
})?;
459+
// Safety: The caller makes sure ipc file is valid
460+
let reader = unsafe {
461+
StreamReader::try_new(file, None)
462+
.map_err(|e| {
463+
BallistaError::General(format!(
464+
"Failed to new arrow FileReader at {path}: {e:?}"
465+
))
466+
})?
467+
.with_skip_validation(skip_validation)
468+
};
447469
Ok(reader)
448470
}
449471

@@ -567,6 +589,7 @@ mod tests {
567589
},
568590
partition_stats: Default::default(),
569591
path: "test_path".to_string(),
592+
skip_validation: true,
570593
})
571594
}
572595

@@ -601,8 +624,7 @@ mod tests {
601624
test_send_fetch_partitions(4, 10).await;
602625
}
603626

604-
#[tokio::test]
605-
async fn test_read_local_shuffle() {
627+
async fn test_read_local_shuffle_impl(skip_validation: bool) {
606628
let session_ctx = SessionContext::new();
607629
let task_ctx = session_ctx.task_ctx();
608630
let work_dir = TempDir::new().unwrap();
@@ -629,7 +651,7 @@ mod tests {
629651

630652
// from to input partitions test the first one with two batches
631653
let file_path = path.value(0);
632-
let reader = fetch_partition_local_inner(file_path).unwrap();
654+
let reader = fetch_partition_local_inner(file_path, skip_validation).unwrap();
633655

634656
let mut stream: Pin<Box<dyn RecordBatchStream + Send>> =
635657
async { Box::pin(LocalShuffleStream::new(reader)) }.await;
@@ -645,6 +667,16 @@ mod tests {
645667
}
646668
}
647669

670+
#[tokio::test]
671+
async fn test_read_local_shuffle_with_validation() {
672+
test_read_local_shuffle_impl(false).await;
673+
}
674+
675+
#[tokio::test]
676+
async fn test_read_local_shuffle_skip_validation() {
677+
test_read_local_shuffle_impl(true).await;
678+
}
679+
648680
async fn test_send_fetch_partitions(max_request_num: usize, partition_num: usize) {
649681
let schema = get_test_partition_schema();
650682
let data_array = Int32Array::from(vec![1]);
@@ -693,6 +725,7 @@ mod tests {
693725
},
694726
partition_stats: Default::default(),
695727
path: path.clone(),
728+
skip_validation: true,
696729
})
697730
.collect()
698731
}

ballista/core/src/extension.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use crate::config::{
1919
BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
20-
BALLISTA_SHUFFLE_READER_MAX_REQUESTS, BALLISTA_STANDALONE_PARALLELISM,
20+
BALLISTA_SHUFFLE_READER_MAX_REQUESTS, BALLISTA_SHUFFLE_READER_SKIP_VALIDATION,
21+
BALLISTA_STANDALONE_PARALLELISM,
2122
};
2223
use crate::planner::BallistaQueryPlanner;
2324
use crate::serde::protobuf::KeyValuePair;
@@ -112,6 +113,12 @@ pub trait SessionConfigExt {
112113
self,
113114
max_requests: usize,
114115
) -> Self;
116+
117+
/// get skip_validation in flight requests for shuffle reader
118+
fn ballista_shuffle_reader_skip_validation(&self) -> bool;
119+
120+
/// set skip_validation in flight requests for shuffle reader
121+
fn with_ballista_shuffle_reader_skip_validation(self, skip_validation: bool) -> Self;
115122
}
116123

117124
/// [SessionConfigHelperExt] is set of [SessionConfig] extension methods
@@ -308,6 +315,23 @@ impl SessionConfigExt for SessionConfig {
308315
.set_usize(BALLISTA_SHUFFLE_READER_MAX_REQUESTS, max_requests)
309316
}
310317
}
318+
319+
fn ballista_shuffle_reader_skip_validation(&self) -> bool {
320+
self.options()
321+
.extensions
322+
.get::<BallistaConfig>()
323+
.map(|c| c.shuffle_reader_skip_validation())
324+
.unwrap_or_else(|| BallistaConfig::default().shuffle_reader_skip_validation())
325+
}
326+
327+
fn with_ballista_shuffle_reader_skip_validation(self, skip_validation: bool) -> Self {
328+
if self.options().extensions.get::<BallistaConfig>().is_some() {
329+
self.set_bool(BALLISTA_SHUFFLE_READER_SKIP_VALIDATION, skip_validation)
330+
} else {
331+
self.with_option_extension(BallistaConfig::default())
332+
.set_bool(BALLISTA_SHUFFLE_READER_SKIP_VALIDATION, skip_validation)
333+
}
334+
}
311335
}
312336

313337
impl SessionConfigHelperExt for SessionConfig {

ballista/core/src/serde/generated/ballista.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@ pub struct FetchPartition {
304304
pub host: ::prost::alloc::string::String,
305305
#[prost(uint32, tag = "6")]
306306
pub port: u32,
307+
#[prost(bool, tag = "7")]
308+
pub skip_validation: bool,
307309
}
308310
#[derive(Clone, PartialEq, ::prost::Message)]
309311
pub struct PartitionLocation {
@@ -319,6 +321,8 @@ pub struct PartitionLocation {
319321
pub partition_stats: ::core::option::Option<PartitionStats>,
320322
#[prost(string, tag = "5")]
321323
pub path: ::prost::alloc::string::String,
324+
#[prost(bool, tag = "6")]
325+
pub skip_validation: bool,
322326
}
323327
/// Unique identifier for a materialized partition of data
324328
#[derive(Clone, PartialEq, ::prost::Message)]

ballista/core/src/serde/scheduler/from_proto.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ impl TryInto<Action> for protobuf::Action {
5656
path: fetch.path,
5757
host: fetch.host,
5858
port: fetch.port as u16,
59+
skip_validation: fetch.skip_validation,
5960
})
6061
}
6162
_ => Err(BallistaError::General(
@@ -126,6 +127,7 @@ impl TryInto<PartitionLocation> for protobuf::PartitionLocation {
126127
})?
127128
.into(),
128129
path: self.path,
130+
skip_validation: self.skip_validation,
129131
})
130132
}
131133
}

ballista/core/src/serde/scheduler/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ pub enum Action {
4242
path: String,
4343
host: String,
4444
port: u16,
45+
// Skip validation for Arrow IPC file when fetch partition
46+
skip_validation: bool,
4547
},
4648
}
4749

@@ -70,6 +72,7 @@ pub struct PartitionLocation {
7072
pub executor_meta: ExecutorMetadata,
7173
pub partition_stats: PartitionStats,
7274
pub path: String,
75+
pub skip_validation: bool,
7376
}
7477

7578
/// Meta-data for an executor, used when fetching shuffle partitions from other executors

0 commit comments

Comments
 (0)