Skip to content

Commit 3578245

Browse files
authored
feat: configure max grpc message size and disable view types in ballista (#1185)
* use configuration to propagate grpc settings * set `datafusion.execution.parquet.schema_force_view_types = false` * return better status when flight request fails * fix issue with variable assignment * fix typo and file name * add test to verify switching off configuration
1 parent faa05af commit 3578245

File tree

11 files changed

+247
-130
lines changed

11 files changed

+247
-130
lines changed

ballista/client/tests/context_checks.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,4 +365,36 @@ mod supported {
365365

366366
Ok(())
367367
}
368+
369+
// test checks if this view types have been disabled in the configuration
370+
//
371+
// `datafusion.execution.parquet.schema_force_view_types` have been disabled
372+
// temporary as they could break shuffle reader/writer.
373+
#[rstest]
374+
#[case::standalone(standalone_context())]
375+
#[case::remote(remote_context())]
376+
#[tokio::test]
377+
async fn should_disable_view_types(
378+
#[future(awt)]
379+
#[case]
380+
ctx: SessionContext,
381+
) -> datafusion::error::Result<()> {
382+
let result = ctx
383+
.sql("select name, value from information_schema.df_settings where name like 'datafusion.execution.parquet.schema_force_view_types' order by name limit 1")
384+
.await?
385+
.collect()
386+
.await?;
387+
//
388+
let expected = [
389+
"+------------------------------------------------------+-------+",
390+
"| name | value |",
391+
"+------------------------------------------------------+-------+",
392+
"| datafusion.execution.parquet.schema_force_view_types | false |",
393+
"+------------------------------------------------------+-------+",
394+
];
395+
396+
assert_batches_eq!(expected, &result);
397+
398+
Ok(())
399+
}
368400
}

ballista/core/src/client.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ const IO_RETRY_WAIT_TIME_MS: u64 = 3000;
6161
impl BallistaClient {
6262
/// Create a new BallistaClient to connect to the executor listening on the specified
6363
/// host and port
64-
pub async fn try_new(host: &str, port: u16) -> Result<Self> {
64+
pub async fn try_new(host: &str, port: u16, max_message_size: usize) -> Result<Self> {
6565
let addr = format!("http://{host}:{port}");
6666
debug!("BallistaClient connecting to {}", addr);
6767
let connection =
@@ -72,8 +72,11 @@ impl BallistaClient {
7272
"Error connecting to Ballista scheduler or executor at {addr}: {e:?}"
7373
))
7474
})?;
75-
let flight_client = FlightServiceClient::new(connection);
76-
debug!("BallistaClient connected OK");
75+
let flight_client = FlightServiceClient::new(connection)
76+
.max_decoding_message_size(max_message_size)
77+
.max_encoding_message_size(max_message_size);
78+
79+
debug!("BallistaClient connected OK: {:?}", flight_client);
7780

7881
Ok(Self { flight_client })
7982
}
@@ -99,13 +102,27 @@ impl BallistaClient {
99102
.await
100103
.map_err(|error| match error {
101104
// map grpc connection error to partition fetch error.
102-
BallistaError::GrpcActionError(msg) => BallistaError::FetchFailed(
103-
executor_id.to_owned(),
104-
partition_id.stage_id,
105-
partition_id.partition_id,
106-
msg,
107-
),
108-
other => other,
105+
BallistaError::GrpcActionError(msg) => {
106+
log::warn!(
107+
"grpc client failed to fetch partition: {:?} , message: {:?}",
108+
partition_id,
109+
msg
110+
);
111+
BallistaError::FetchFailed(
112+
executor_id.to_owned(),
113+
partition_id.stage_id,
114+
partition_id.partition_id,
115+
msg,
116+
)
117+
}
118+
error => {
119+
log::warn!(
120+
"grpc client failed to fetch partition: {:?} , error: {:?}",
121+
partition_id,
122+
error
123+
);
124+
error
125+
}
109126
})
110127
}
111128

ballista/core/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ pub const BALLISTA_STANDALONE_PARALLELISM: &str = "ballista.standalone.paralleli
3232
/// max message size for gRPC clients
3333
pub const BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE: &str =
3434
"ballista.grpc_client_max_message_size";
35+
pub const BALLISTA_SHUFFLE_READER_MAX_REQUESTS: &str =
36+
"ballista.shuffle.max_concurrent_read_requests";
3537

3638
pub type ParseResult<T> = result::Result<T, String>;
3739
use std::sync::LazyLock;
@@ -48,6 +50,10 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
4850
"Configuration for max message size in gRPC clients".to_string(),
4951
DataType::UInt64,
5052
Some((16 * 1024 * 1024).to_string())),
53+
ConfigEntry::new(BALLISTA_SHUFFLE_READER_MAX_REQUESTS.to_string(),
54+
"Maximum concurrent requests shuffle reader can process".to_string(),
55+
DataType::UInt64,
56+
Some((64).to_string())),
5157
];
5258
entries
5359
.into_iter()
@@ -165,6 +171,10 @@ impl BallistaConfig {
165171
self.get_usize_setting(BALLISTA_STANDALONE_PARALLELISM)
166172
}
167173

174+
pub fn shuffle_reader_maximum_concurrent_requests(&self) -> usize {
175+
self.get_usize_setting(BALLISTA_SHUFFLE_READER_MAX_REQUESTS)
176+
}
177+
168178
fn get_usize_setting(&self, key: &str) -> usize {
169179
if let Some(v) = self.settings.get(key) {
170180
// infallible because we validate all configs in the constructor

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,16 @@ async fn execute_query(
310310
break Err(DataFusionError::Execution(msg));
311311
}
312312
Some(job_status::Status::Successful(successful)) => {
313-
let streams = successful.partition_location.into_iter().map(|p| {
314-
let f = fetch_partition(p)
315-
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
313+
let streams =
314+
successful
315+
.partition_location
316+
.into_iter()
317+
.map(move |partition| {
318+
let f = fetch_partition(partition, max_message_size)
319+
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
316320

317-
futures::stream::once(f).try_flatten()
318-
});
321+
futures::stream::once(f).try_flatten()
322+
});
319323

320324
break Ok(futures::stream::iter(streams).flatten());
321325
}
@@ -325,6 +329,7 @@ async fn execute_query(
325329

326330
async fn fetch_partition(
327331
location: PartitionLocation,
332+
max_message_size: usize,
328333
) -> Result<SendableRecordBatchStream> {
329334
let metadata = location.executor_meta.ok_or_else(|| {
330335
DataFusionError::Internal("Received empty executor metadata".to_owned())
@@ -334,7 +339,7 @@ async fn fetch_partition(
334339
})?;
335340
let host = metadata.host.as_str();
336341
let port = metadata.port as u16;
337-
let mut ballista_client = BallistaClient::try_new(host, port)
342+
let mut ballista_client = BallistaClient::try_new(host, port, max_message_size)
338343
.await
339344
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
340345
ballista_client

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::sync::Arc;
2929
use std::task::{Context, Poll};
3030

3131
use crate::client::BallistaClient;
32+
use crate::extension::SessionConfigExt;
3233
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
3334

3435
use datafusion::arrow::datatypes::SchemaRef;
@@ -146,8 +147,18 @@ impl ExecutionPlan for ShuffleReaderExec {
146147
let task_id = context.task_id().unwrap_or_else(|| partition.to_string());
147148
info!("ShuffleReaderExec::execute({})", task_id);
148149

149-
// TODO make the maximum size configurable, or make it depends on global memory control
150-
let max_request_num = 50usize;
150+
let config = context.session_config();
151+
152+
let max_request_num =
153+
config.ballista_shuffle_reader_maximum_concurrent_requests();
154+
let max_message_size = config.ballista_grpc_client_max_message_size();
155+
156+
log::debug!(
157+
"ShuffleReaderExec::execute({}) max_request_num: {}, max_message_size: {}",
158+
task_id,
159+
max_request_num,
160+
max_message_size
161+
);
151162
let mut partition_locations = HashMap::new();
152163
for p in &self.partition[partition] {
153164
partition_locations
@@ -166,7 +177,7 @@ impl ExecutionPlan for ShuffleReaderExec {
166177
partition_locations.shuffle(&mut thread_rng());
167178

168179
let response_receiver =
169-
send_fetch_partitions(partition_locations, max_request_num);
180+
send_fetch_partitions(partition_locations, max_request_num, max_message_size);
170181

171182
let result = RecordBatchStreamAdapter::new(
172183
Arc::new(self.schema.as_ref().clone()),
@@ -284,6 +295,7 @@ impl Stream for AbortableReceiverStream {
284295
fn send_fetch_partitions(
285296
partition_locations: Vec<PartitionLocation>,
286297
max_request_num: usize,
298+
max_message_size: usize,
287299
) -> AbortableReceiverStream {
288300
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
289301
let semaphore = Arc::new(Semaphore::new(max_request_num));
@@ -302,7 +314,9 @@ fn send_fetch_partitions(
302314
let response_sender_c = response_sender.clone();
303315
spawned_tasks.push(SpawnedTask::spawn(async move {
304316
for p in local_locations {
305-
let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
317+
let r = PartitionReaderEnum::Local
318+
.fetch_partition(&p, max_message_size)
319+
.await;
306320
if let Err(e) = response_sender_c.send(r).await {
307321
error!("Fail to send response event to the channel due to {}", e);
308322
}
@@ -315,7 +329,9 @@ fn send_fetch_partitions(
315329
spawned_tasks.push(SpawnedTask::spawn(async move {
316330
// Block if exceeds max request number.
317331
let permit = semaphore.acquire_owned().await.unwrap();
318-
let r = PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
332+
let r = PartitionReaderEnum::FlightRemote
333+
.fetch_partition(&p, max_message_size)
334+
.await;
319335
// Block if the channel buffer is full.
320336
if let Err(e) = response_sender.send(r).await {
321337
error!("Fail to send response event to the channel due to {}", e);
@@ -339,6 +355,7 @@ trait PartitionReader: Send + Sync + Clone {
339355
async fn fetch_partition(
340356
&self,
341357
location: &PartitionLocation,
358+
max_message_size: usize,
342359
) -> result::Result<SendableRecordBatchStream, BallistaError>;
343360
}
344361

@@ -356,9 +373,12 @@ impl PartitionReader for PartitionReaderEnum {
356373
async fn fetch_partition(
357374
&self,
358375
location: &PartitionLocation,
376+
max_message_size: usize,
359377
) -> result::Result<SendableRecordBatchStream, BallistaError> {
360378
match self {
361-
PartitionReaderEnum::FlightRemote => fetch_partition_remote(location).await,
379+
PartitionReaderEnum::FlightRemote => {
380+
fetch_partition_remote(location, max_message_size).await
381+
}
362382
PartitionReaderEnum::Local => fetch_partition_local(location).await,
363383
PartitionReaderEnum::ObjectStoreRemote => {
364384
fetch_partition_object_store(location).await
@@ -369,26 +389,26 @@ impl PartitionReader for PartitionReaderEnum {
369389

370390
async fn fetch_partition_remote(
371391
location: &PartitionLocation,
392+
max_message_size: usize,
372393
) -> result::Result<SendableRecordBatchStream, BallistaError> {
373394
let metadata = &location.executor_meta;
374395
let partition_id = &location.partition_id;
375396
// TODO for shuffle client connections, we should avoid creating new connections again and again.
376397
// And we should also avoid to keep alive too many connections for long time.
377398
let host = metadata.host.as_str();
378399
let port = metadata.port;
379-
let mut ballista_client =
380-
BallistaClient::try_new(host, port)
381-
.await
382-
.map_err(|error| match error {
383-
// map grpc connection error to partition fetch error.
384-
BallistaError::GrpcConnectionError(msg) => BallistaError::FetchFailed(
385-
metadata.id.clone(),
386-
partition_id.stage_id,
387-
partition_id.partition_id,
388-
msg,
389-
),
390-
other => other,
391-
})?;
400+
let mut ballista_client = BallistaClient::try_new(host, port, max_message_size)
401+
.await
402+
.map_err(|error| match error {
403+
// map grpc connection error to partition fetch error.
404+
BallistaError::GrpcConnectionError(msg) => BallistaError::FetchFailed(
405+
metadata.id.clone(),
406+
partition_id.stage_id,
407+
partition_id.partition_id,
408+
msg,
409+
),
410+
other => other,
411+
})?;
392412

393413
ballista_client
394414
.fetch_partition(&metadata.id, partition_id, &location.path, host, port)
@@ -644,7 +664,7 @@ mod tests {
644664
);
645665

646666
let response_receiver =
647-
send_fetch_partitions(partition_locations, max_request_num);
667+
send_fetch_partitions(partition_locations, max_request_num, 4 * 1024 * 1024);
648668

649669
let stream = RecordBatchStreamAdapter::new(
650670
Arc::new(schema),

0 commit comments

Comments
 (0)