Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,17 @@ mod supported {
assert!(ballista_config_extension.is_some());

let result = ctx
.sql("select name, value from information_schema.df_settings where name like 'ballista.%' order by name limit 2")
.sql("select name, value from information_schema.df_settings where name = 'ballista.job.name'")
.await?
.collect()
.await?;

let expected = [
"+---------------------------------------+----------+",
"| name | value |",
"+---------------------------------------+----------+",
"| ballista.grpc_client_max_message_size | 16777216 |",
"| ballista.job.name | |",
"+---------------------------------------+----------+",
"+-------------------+-------+",
"| name | value |",
"+-------------------+-------+",
"| ballista.job.name | |",
"+-------------------+-------+",
];

assert_batches_eq!(expected, &result);
Expand Down
14 changes: 7 additions & 7 deletions ballista/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use datafusion::error::DataFusionError;
use datafusion::error::Result;

use crate::serde::protobuf;
use crate::utils::create_grpc_client_connection;
use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use log::{debug, warn};
Expand All @@ -71,15 +71,15 @@ impl BallistaClient {
max_message_size: usize,
) -> BResult<Self> {
let addr = format!("http://{host}:{port}");
let grpc_config = GrpcClientConfig::default();
debug!("BallistaClient connecting to {addr}");
let connection =
create_grpc_client_connection(addr.clone())
.await
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
let connection = create_grpc_client_connection(addr.clone(), &grpc_config)
.await
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
"Error connecting to Ballista scheduler or executor at {addr}: {e:?}"
))
})?;
})?;
let flight_client = FlightServiceClient::new(connection)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
Expand Down
47 changes: 44 additions & 3 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
"ballista.shuffle.remote_read_prefer_flight";

// gRPC client timeout configurations
pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
"ballista.grpc.client.connect_timeout_seconds";
pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
"ballista.grpc.client.timeout_seconds";
pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
"ballista.grpc.client.tcp_keepalive_seconds";
pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
"ballista.grpc.client.http2_keepalive_interval_seconds";

pub type ParseResult<T> = result::Result<T, String>;
use std::sync::LazyLock;

Expand All @@ -48,8 +58,8 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Sets the job name that will appear in the web user interface for any submitted jobs".to_string(),
DataType::Utf8, None),
ConfigEntry::new(BALLISTA_STANDALONE_PARALLELISM.to_string(),
"Standalone processing parallelism ".to_string(),
DataType::UInt16, Some(std::thread::available_parallelism().map(|v| v.get()).unwrap_or(1).to_string())),
"Standalone processing parallelism ".to_string(),
DataType::UInt16, Some(std::thread::available_parallelism().map(|v| v.get()).unwrap_or(1).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE.to_string(),
"Configuration for max message size in gRPC clients".to_string(),
DataType::UInt64,
Expand All @@ -66,7 +76,22 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Forces the shuffle reader to use flight reader instead of block reader for remote read. Block reader usually has better performance and resource utilization".to_string(),
DataType::Boolean,
Some((false).to_string())),

ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
"Connection timeout for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((20).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS.to_string(),
"Request timeout for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((20).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS.to_string(),
"TCP keep-alive interval for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((3600).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
"HTTP/2 keep-alive interval for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((300).to_string()))
];
entries
.into_iter()
Expand Down Expand Up @@ -188,6 +213,22 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_SHUFFLE_READER_MAX_REQUESTS)
}

pub fn default_grpc_client_connect_timeout_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS)
}

pub fn default_grpc_client_timeout_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS)
}

pub fn default_grpc_client_tcp_keepalive_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS)
}

pub fn default_grpc_client_http2_keepalive_interval_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS)
}

/// Forces the shuffle reader to always read partitions via the Arrow Flight client,
/// even when partitions are local to the node.
///
Expand Down
6 changes: 4 additions & 2 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, GetJobStatusParams,
GetJobStatusResult, KeyValuePair, PartitionLocation,
};
use crate::utils::create_grpc_client_connection;
use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -238,6 +238,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
self.session_id.clone(),
query,
self.config.default_grpc_client_max_message_size(),
GrpcClientConfig::from(&self.config),
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -274,10 +275,11 @@ async fn execute_query(
session_id: String,
query: ExecuteQueryParams,
max_message_size: usize,
grpc_config: GrpcClientConfig,
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
info!("Connecting to Ballista scheduler at {scheduler_url}");
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
let connection = create_grpc_client_connection(scheduler_url)
let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

Expand Down
117 changes: 108 additions & 9 deletions ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::config::BallistaConfig;
use crate::error::{BallistaError, Result};
use crate::extension::SessionConfigExt;
use crate::serde::scheduler::PartitionStats;
Expand All @@ -36,6 +37,95 @@ use std::{fs::File, pin::Pin};
use tonic::codegen::StdError;
use tonic::transport::{Channel, Error, Server};

/// Configuration for gRPC client connections.
///
/// This struct holds timeout and keep-alive settings that are applied
/// when establishing gRPC connections from executors to schedulers or
/// between distributed components.
///
/// # Examples
///
/// ```
/// use ballista_core::config::BallistaConfig;
/// use ballista_core::utils::GrpcClientConfig;
///
/// let ballista_config = BallistaConfig::default();
/// let grpc_config = GrpcClientConfig::from(&ballista_config);
/// ```
#[derive(Debug, Clone)]
pub struct GrpcClientConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please put some docs please, we will try to catch up with documentation at some point so it would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

/// Connection timeout in seconds
pub connect_timeout_seconds: u64,
/// Request timeout in seconds
pub timeout_seconds: u64,
/// TCP keep-alive interval in seconds
pub tcp_keepalive_seconds: u64,
/// HTTP/2 keep-alive ping interval in seconds
pub http2_keepalive_interval_seconds: u64,
}

impl From<&BallistaConfig> for GrpcClientConfig {
fn from(config: &BallistaConfig) -> Self {
Self {
connect_timeout_seconds: config.default_grpc_client_connect_timeout_seconds()
as u64,
timeout_seconds: config.default_grpc_client_timeout_seconds() as u64,
tcp_keepalive_seconds: config.default_grpc_client_tcp_keepalive_seconds()
as u64,
http2_keepalive_interval_seconds: config
.default_grpc_client_http2_keepalive_interval_seconds()
as u64,
}
}
}

impl Default for GrpcClientConfig {
fn default() -> Self {
Self {
connect_timeout_seconds: 20,
timeout_seconds: 20,
tcp_keepalive_seconds: 3600,
http2_keepalive_interval_seconds: 300,
}
}
}

/// Configuration for gRPC server.
///
/// This struct holds timeout and keep-alive settings that are applied
/// when creating gRPC servers in executors and schedulers.
///
/// # Examples
///
/// ```
/// use ballista_core::utils::GrpcServerConfig;
///
/// let server_config = GrpcServerConfig::default();
/// let server = ballista_core::utils::create_grpc_server(&server_config);
/// ```
#[derive(Debug, Clone)]
pub struct GrpcServerConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please put some docs please, we will try to catch up with documentation at some point so it would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

/// Request timeout in seconds
pub timeout_seconds: u64,
/// TCP keep-alive interval in seconds
pub tcp_keepalive_seconds: u64,
/// HTTP/2 keep-alive ping interval in seconds
pub http2_keepalive_interval_seconds: u64,
/// HTTP/2 keep-alive ping timeout in seconds
pub http2_keepalive_timeout_seconds: u64,
}

impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
timeout_seconds: 20,
tcp_keepalive_seconds: 3600,
http2_keepalive_interval_seconds: 300,
http2_keepalive_timeout_seconds: 20,
}
}
}

/// Default session builder using the provided configuration
pub fn default_session_builder(
config: SessionConfig,
Expand Down Expand Up @@ -106,31 +196,40 @@ pub async fn collect_stream(

pub async fn create_grpc_client_connection<D>(
dst: D,
config: &GrpcClientConfig,
) -> std::result::Result<Channel, Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let endpoint = tonic::transport::Endpoint::new(dst)?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.connect_timeout(Duration::from_secs(config.connect_timeout_seconds))
.timeout(Duration::from_secs(config.timeout_seconds))
// Disable Nagle's Algorithm since we don't want packets to wait
.tcp_nodelay(true)
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keep_alive_interval(Duration::from_secs(300))
.tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
.http2_keep_alive_interval(Duration::from_secs(
config.http2_keepalive_interval_seconds,
))
// Use a fixed timeout for keep-alive pings to keep configuration simple
// since this is a standalone configuration
.keep_alive_timeout(Duration::from_secs(20))
.keep_alive_while_idle(true);
endpoint.connect().await
}

pub fn create_grpc_server() -> Server {
pub fn create_grpc_server(config: &GrpcServerConfig) -> Server {
Server::builder()
.timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(config.timeout_seconds))
// Disable Nagle's Algorithm since we don't want packets to wait
.tcp_nodelay(true)
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
.http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
.tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
.http2_keepalive_interval(Some(Duration::from_secs(
config.http2_keepalive_interval_seconds,
)))
.http2_keepalive_timeout(Some(Duration::from_secs(
config.http2_keepalive_timeout_seconds,
)))
}

pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
grpc_server_config: ballista_core::utils::GrpcServerConfig::default(),
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
override_execution_engine: None,
override_function_registry: None,
Expand Down
Loading
Loading