@@ -31,32 +31,37 @@ use datafusion::{
3131pub const BALLISTA_JOB_NAME : & str = "ballista.job.name" ;
3232/// Configuration key for standalone processing parallelism.
3333pub const BALLISTA_STANDALONE_PARALLELISM : & str = "ballista.standalone.parallelism" ;
34+
35+ // gRPC configuration
36+
37+ /// Configuration key for gRPC client connection timeout in seconds.
38+ pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS : & str =
39+ "ballista.grpc.client.connect_timeout_seconds" ;
40+ /// Configuration key for HTTP/2 keep-alive interval for gRPC clients in seconds.
41+ pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS : & str =
42+ "ballista.grpc.client.http2_keepalive_interval_seconds" ;
3443/// max message size for gRPC clients
3544pub const BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE : & str =
3645 "ballista.grpc_client_max_message_size" ;
37- /// Configuration key for maximum concurrent shuffle read requests.
38- pub const BALLISTA_SHUFFLE_READER_MAX_REQUESTS : & str =
39- "ballista.shuffle.max_concurrent_read_requests" ;
46+ /// Configuration key for TCP keep-alive interval for gRPC clients in seconds.
47+ pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS : & str =
48+ "ballista.grpc.client.tcp_keepalive_seconds" ;
49+ /// Configuration key for gRPC client request timeout in seconds.
50+ pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS : & str =
51+ "ballista.grpc.client.timeout_seconds" ;
52+
53+ // Shuffle reader configuration
54+
4055/// Configuration key to force remote reads even for local partitions.
4156pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ : & str =
4257 "ballista.shuffle.force_remote_read" ;
58+ /// Configuration key for maximum concurrent shuffle read requests.
59+ pub const BALLISTA_SHUFFLE_READER_MAX_REQUESTS : & str =
60+ "ballista.shuffle.max_concurrent_read_requests" ;
4361/// Configuration key to prefer Flight protocol for remote shuffle reads.
4462pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT : & str =
4563 "ballista.shuffle.remote_read_prefer_flight" ;
4664
47- /// Configuration key for gRPC client connection timeout in seconds.
48- pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS : & str =
49- "ballista.grpc.client.connect_timeout_seconds" ;
50- /// Configuration key for gRPC client request timeout in seconds.
51- pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS : & str =
52- "ballista.grpc.client.timeout_seconds" ;
53- /// Configuration key for TCP keep-alive interval for gRPC clients in seconds.
54- pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS : & str =
55- "ballista.grpc.client.tcp_keepalive_seconds" ;
56- /// Configuration key for HTTP/2 keep-alive interval for gRPC clients in seconds.
57- pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS : & str =
58- "ballista.grpc.client.http2_keepalive_interval_seconds" ;
59-
6065/// Result type for configuration parsing operations.
6166pub type ParseResult < T > = result:: Result < T , String > ;
6267use std:: sync:: LazyLock ;
@@ -69,38 +74,43 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
6974 ConfigEntry :: new( BALLISTA_STANDALONE_PARALLELISM . to_string( ) ,
7075 "Standalone processing parallelism " . to_string( ) ,
7176 DataType :: UInt16 , Some ( std:: thread:: available_parallelism( ) . map( |v| v. get( ) ) . unwrap_or( 1 ) . to_string( ) ) ) ,
77+
78+ // gRPC configuration
79+ ConfigEntry :: new( BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS . to_string( ) ,
80+ "Connection timeout for gRPC client in seconds" . to_string( ) ,
81+ DataType :: UInt64 ,
82+ Some ( ( 20 ) . to_string( ) ) ) ,
83+ ConfigEntry :: new( BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS . to_string( ) ,
84+ "HTTP/2 keep-alive interval for gRPC client in seconds" . to_string( ) ,
85+ DataType :: UInt64 ,
86+ Some ( ( 300 ) . to_string( ) ) ) ,
7287 ConfigEntry :: new( BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE . to_string( ) ,
7388 "Configuration for max message size in gRPC clients" . to_string( ) ,
7489 DataType :: UInt64 ,
7590 Some ( ( 16 * 1024 * 1024 ) . to_string( ) ) ) ,
76- ConfigEntry :: new( BALLISTA_SHUFFLE_READER_MAX_REQUESTS . to_string( ) ,
77- "Maximum concurrent requests shuffle reader can process " . to_string( ) ,
91+ ConfigEntry :: new( BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS . to_string( ) ,
92+ "TCP keep-alive interval for gRPC client in seconds " . to_string( ) ,
7893 DataType :: UInt64 ,
79- Some ( ( 64 ) . to_string( ) ) ) ,
94+ Some ( ( 3600 ) . to_string( ) ) ) ,
95+ ConfigEntry :: new( BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS . to_string( ) ,
96+ "Request timeout for gRPC client in seconds" . to_string( ) ,
97+ DataType :: UInt64 ,
98+ Some ( ( 20 ) . to_string( ) ) ) ,
99+
100+
101+ // Shuffle reader configuration
80102 ConfigEntry :: new( BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ . to_string( ) ,
81103 "Forces the shuffle reader to always read partitions via the Arrow Flight client, even when partitions are local to the node." . to_string( ) ,
82104 DataType :: Boolean ,
83105 Some ( ( false ) . to_string( ) ) ) ,
106+ ConfigEntry :: new( BALLISTA_SHUFFLE_READER_MAX_REQUESTS . to_string( ) ,
107+ "Maximum concurrent requests shuffle reader can process" . to_string( ) ,
108+ DataType :: UInt64 ,
109+ Some ( ( 64 ) . to_string( ) ) ) ,
84110 ConfigEntry :: new( BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT . to_string( ) ,
85111 "Forces the shuffle reader to use flight reader instead of block reader for remote read. Block reader usually has better performance and resource utilization" . to_string( ) ,
86112 DataType :: Boolean ,
87113 Some ( ( false ) . to_string( ) ) ) ,
88- ConfigEntry :: new( BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS . to_string( ) ,
89- "Connection timeout for gRPC client in seconds" . to_string( ) ,
90- DataType :: UInt64 ,
91- Some ( ( 20 ) . to_string( ) ) ) ,
92- ConfigEntry :: new( BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS . to_string( ) ,
93- "Request timeout for gRPC client in seconds" . to_string( ) ,
94- DataType :: UInt64 ,
95- Some ( ( 20 ) . to_string( ) ) ) ,
96- ConfigEntry :: new( BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS . to_string( ) ,
97- "TCP keep-alive interval for gRPC client in seconds" . to_string( ) ,
98- DataType :: UInt64 ,
99- Some ( ( 3600 ) . to_string( ) ) ) ,
100- ConfigEntry :: new( BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS . to_string( ) ,
101- "HTTP/2 keep-alive interval for gRPC client in seconds" . to_string( ) ,
102- DataType :: UInt64 ,
103- Some ( ( 300 ) . to_string( ) ) )
104114 ] ;
105115 entries
106116 . into_iter ( )
@@ -212,6 +222,19 @@ impl BallistaConfig {
212222 & self . settings
213223 }
214224
225+ /// Allows skipping redundant validation of arrow IPC valid data.
226+ pub fn arrow_ipc_reader_skip_validation ( & self ) -> bool {
227+ #[ cfg( feature = "arrow-ipc-optimizations" ) ]
228+ {
229+ true
230+ }
231+
232+ #[ cfg( not( feature = "arrow-ipc-optimizations" ) ) ]
233+ {
234+ false
235+ }
236+ }
237+
215238 /// Returns the maximum message size for gRPC clients in bytes.
216239 pub fn default_grpc_client_max_message_size ( & self ) -> usize {
217240 self . get_usize_setting ( BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE )
0 commit comments