Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,12 @@ mod supported {
.await?;

let expected = [
"+---------------------------------------+----------+",
"| name | value |",
"+---------------------------------------+----------+",
"| ballista.grpc_client_max_message_size | 16777216 |",
"| ballista.job.name | |",
"+---------------------------------------+----------+",
"+-------------------------------------------------------+-------+",
"| name | value |",
"+-------------------------------------------------------+-------+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please update query to return ballista.job.name only. so we don't change the test every time we add new option.

"| ballista.grpc.client.connect_timeout_seconds | 20 |",
"| ballista.grpc.client.http2_keepalive_interval_seconds | 300 |",
"+-------------------------------------------------------+-------+",
];

assert_batches_eq!(expected, &result);
Expand Down
14 changes: 7 additions & 7 deletions ballista/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use datafusion::error::DataFusionError;
use datafusion::error::Result;

use crate::serde::protobuf;
use crate::utils::create_grpc_client_connection;
use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use log::{debug, warn};
Expand All @@ -71,15 +71,15 @@ impl BallistaClient {
max_message_size: usize,
) -> BResult<Self> {
let addr = format!("http://{host}:{port}");
let grpc_config = GrpcClientConfig::default();
debug!("BallistaClient connecting to {addr}");
let connection =
create_grpc_client_connection(addr.clone())
.await
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
let connection = create_grpc_client_connection(addr.clone(), &grpc_config)
.await
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
"Error connecting to Ballista scheduler or executor at {addr}: {e:?}"
))
})?;
})?;
let flight_client = FlightServiceClient::new(connection)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
Expand Down
57 changes: 54 additions & 3 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
"ballista.shuffle.remote_read_prefer_flight";

// gRPC client timeout configurations
pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
"ballista.grpc.client.connect_timeout_seconds";
pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
"ballista.grpc.client.timeout_seconds";
pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
"ballista.grpc.client.tcp_keepalive_seconds";
pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
"ballista.grpc.client.http2_keepalive_interval_seconds";
pub const BALLISTA_STANDALONE_GRPC_CLIENT_KEEPALIVE_TIMEOUT_SECONDS: &str =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need standalone config? Standalone is used just for local testing, maybe if we can live without it. It would make configuration options simpler. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. You're right. We can make configuration options simpler.

"ballista.standalone.grpc.client.keepalive_timeout_seconds";

pub type ParseResult<T> = result::Result<T, String>;
use std::sync::LazyLock;

Expand All @@ -48,8 +60,8 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Sets the job name that will appear in the web user interface for any submitted jobs".to_string(),
DataType::Utf8, None),
ConfigEntry::new(BALLISTA_STANDALONE_PARALLELISM.to_string(),
"Standalone processing parallelism ".to_string(),
DataType::UInt16, Some(std::thread::available_parallelism().map(|v| v.get()).unwrap_or(1).to_string())),
"Standalone processing parallelism ".to_string(),
DataType::UInt16, Some(std::thread::available_parallelism().map(|v| v.get()).unwrap_or(1).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE.to_string(),
"Configuration for max message size in gRPC clients".to_string(),
DataType::UInt64,
Expand All @@ -66,7 +78,26 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Forces the shuffle reader to use flight reader instead of block reader for remote read. Block reader usually has better performance and resource utilization".to_string(),
DataType::Boolean,
Some((false).to_string())),

ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
"Connection timeout for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((20).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS.to_string(),
"Request timeout for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((20).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS.to_string(),
"TCP keep-alive interval for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((3600).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
"HTTP/2 keep-alive interval for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((300).to_string())),
ConfigEntry::new(BALLISTA_STANDALONE_GRPC_CLIENT_KEEPALIVE_TIMEOUT_SECONDS.to_string(),
"Keep-alive timeout for gRPC client in seconds (standalone mode only)".to_string(),
DataType::UInt64,
Some((20).to_string()))
];
entries
.into_iter()
Expand Down Expand Up @@ -188,6 +219,26 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_SHUFFLE_READER_MAX_REQUESTS)
}

pub fn default_grpc_client_connect_timeout_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS)
}

pub fn default_grpc_client_timeout_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS)
}

pub fn default_grpc_client_tcp_keepalive_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS)
}

pub fn default_grpc_client_http2_keepalive_interval_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS)
}

pub fn default_standalone_grpc_client_keepalive_timeout_seconds(&self) -> usize {
self.get_usize_setting(BALLISTA_STANDALONE_GRPC_CLIENT_KEEPALIVE_TIMEOUT_SECONDS)
}

/// Forces the shuffle reader to always read partitions via the Arrow Flight client,
/// even when partitions are local to the node.
///
Expand Down
6 changes: 4 additions & 2 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, GetJobStatusParams,
GetJobStatusResult, KeyValuePair, PartitionLocation,
};
use crate::utils::create_grpc_client_connection;
use crate::utils::{create_grpc_client_connection, GrpcClientConfig};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -238,6 +238,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
self.session_id.clone(),
query,
self.config.default_grpc_client_max_message_size(),
GrpcClientConfig::from_ballista_config(&self.config),
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -274,10 +275,11 @@ async fn execute_query(
session_id: String,
query: ExecuteQueryParams,
max_message_size: usize,
grpc_config: GrpcClientConfig,
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
info!("Connecting to Ballista scheduler at {scheduler_url}");
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
let connection = create_grpc_client_connection(scheduler_url)
let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

Expand Down
86 changes: 76 additions & 10 deletions ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::config::BallistaConfig;
use crate::error::{BallistaError, Result};
use crate::extension::SessionConfigExt;
use crate::serde::scheduler::PartitionStats;
Expand All @@ -36,6 +37,64 @@ use std::{fs::File, pin::Pin};
use tonic::codegen::StdError;
use tonic::transport::{Channel, Error, Server};

#[derive(Debug, Clone)]
pub struct GrpcClientConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please put some docs please, we will try to catch up with documentation at some point so it would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

pub connect_timeout_seconds: u64,
pub timeout_seconds: u64,
pub tcp_keepalive_seconds: u64,
pub http2_keepalive_interval_seconds: u64,
pub keepalive_timeout_seconds: u64,
}

impl GrpcClientConfig {
pub fn from_ballista_config(config: &BallistaConfig) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to implement this as From or Into not sure which one would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that GrpcClientConfig is derived by extracting specific configs from BallistaConfig, I think From is more appropriate than Into here. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From make sense, it will implement into as well, if i'm not mistaken

Self {
connect_timeout_seconds: config.default_grpc_client_connect_timeout_seconds()
as u64,
timeout_seconds: config.default_grpc_client_timeout_seconds() as u64,
tcp_keepalive_seconds: config.default_grpc_client_tcp_keepalive_seconds()
as u64,
http2_keepalive_interval_seconds: config
.default_grpc_client_http2_keepalive_interval_seconds()
as u64,
keepalive_timeout_seconds: config
.default_standalone_grpc_client_keepalive_timeout_seconds()
as u64,
}
}
}

impl Default for GrpcClientConfig {
fn default() -> Self {
Self {
connect_timeout_seconds: 20,
timeout_seconds: 20,
tcp_keepalive_seconds: 3600,
http2_keepalive_interval_seconds: 300,
keepalive_timeout_seconds: 20,
}
}
}

#[derive(Debug, Clone)]
pub struct GrpcServerConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please put some docs please, we will try to catch up with documentation at some point so it would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

pub timeout_seconds: u64,
pub tcp_keepalive_seconds: u64,
pub http2_keepalive_interval_seconds: u64,
pub http2_keepalive_timeout_seconds: u64,
}

impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
timeout_seconds: 20,
tcp_keepalive_seconds: 3600,
http2_keepalive_interval_seconds: 300,
http2_keepalive_timeout_seconds: 20,
}
}
}

/// Default session builder using the provided configuration
pub fn default_session_builder(
config: SessionConfig,
Expand Down Expand Up @@ -106,31 +165,38 @@ pub async fn collect_stream(

pub async fn create_grpc_client_connection<D>(
dst: D,
config: &GrpcClientConfig,
) -> std::result::Result<Channel, Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let endpoint = tonic::transport::Endpoint::new(dst)?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.connect_timeout(Duration::from_secs(config.connect_timeout_seconds))
.timeout(Duration::from_secs(config.timeout_seconds))
// Disable Nagle's Algorithm since we don't want packets to wait
.tcp_nodelay(true)
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keep_alive_interval(Duration::from_secs(300))
.keep_alive_timeout(Duration::from_secs(20))
.tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
.http2_keep_alive_interval(Duration::from_secs(
config.http2_keepalive_interval_seconds,
))
.keep_alive_timeout(Duration::from_secs(config.keepalive_timeout_seconds))
.keep_alive_while_idle(true);
endpoint.connect().await
}

pub fn create_grpc_server() -> Server {
pub fn create_grpc_server(config: &GrpcServerConfig) -> Server {
Server::builder()
.timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(config.timeout_seconds))
// Disable Nagle's Algorithm since we don't want packets to wait
.tcp_nodelay(true)
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
.http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
.tcp_keepalive(Some(Duration::from_secs(config.tcp_keepalive_seconds)))
.http2_keepalive_interval(Some(Duration::from_secs(
config.http2_keepalive_interval_seconds,
)))
.http2_keepalive_timeout(Some(Duration::from_secs(
config.http2_keepalive_timeout_seconds,
)))
}

pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
grpc_server_config: ballista_core::utils::GrpcServerConfig::default(),
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
override_execution_engine: None,
override_function_registry: None,
Expand Down
Loading