Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,10 @@ message JobStatus {

message GetJobStatusResult {
JobStatus status = 1;
oneof flight_proxy {
bool local = 2;
string external = 3;
}
}

message FilePartitionMetadata {
Expand Down
141 changes: 133 additions & 8 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

use crate::client::BallistaClient;
use crate::config::BallistaConfig;
use crate::serde::protobuf::SuccessfulJob;
use crate::serde::protobuf::get_job_status_result::FlightProxy;
use crate::serde::protobuf::{
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
PartitionLocation, execute_query_params::Query, execute_query_result, job_status,
scheduler_grpc_client::SchedulerGrpcClient,
};
use crate::serde::protobuf::{ExecutorMetadata, SuccessfulJob};
use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
Expand All @@ -49,6 +50,7 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use url::Url;

/// This operator sends a logical plan to a Ballista scheduler for execution and
/// polls the scheduler until the query is complete and then fetches the resulting
Expand Down Expand Up @@ -295,7 +297,7 @@ async fn execute_query(

info!("Connecting to Ballista scheduler at {scheduler_url}");
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
let connection = create_grpc_client_connection(scheduler_url.clone(), &grpc_config)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

Expand Down Expand Up @@ -327,7 +329,10 @@ async fn execute_query(
let mut prev_status: Option<job_status::Status> = None;

loop {
let GetJobStatusResult { status } = scheduler
let GetJobStatusResult {
status,
flight_proxy,
} = scheduler
.get_job_status(GetJobStatusParams {
job_id: job_id.clone(),
})
Expand Down Expand Up @@ -403,8 +408,14 @@ async fn execute_query(
// This could be added in a future enhancement by wrapping the stream.

let streams = partition_location.into_iter().map(move |partition| {
let f = fetch_partition(partition, max_message_size, true)
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
let f = fetch_partition(
partition,
max_message_size,
true,
scheduler_url.clone(),
flight_proxy.clone(),
)
.map_err(|e| ArrowError::ExternalError(Box::new(e)));

futures::stream::once(f).try_flatten()
});
Expand All @@ -415,22 +426,75 @@ async fn execute_query(
}
}

fn get_client_host_port(
executor_metadata: &ExecutorMetadata,
scheduler_url: &str,
flight_proxy: &Option<FlightProxy>,
) -> Result<(String, u16)> {
fn split_host_port(address: &str) -> Result<(String, u16)> {
let url: Url = address.parse().map_err(|e| {
DataFusionError::Execution(format!(
"Cannot parse host:port in {address:?}: {e}"
))
})?;
let host = url
.host_str()
.ok_or(DataFusionError::Execution(format!(
"No host in {address:?}"
)))?
.to_string();
let port: u16 = url.port().ok_or(DataFusionError::Execution(format!(
"No port in {address:?}"
)))?;
Ok((host, port))
}

match flight_proxy {
Some(FlightProxy::External(address)) => {
debug!("Fetching results from external flight proxy: {}", address);
split_host_port(format!("http://{address}").as_str())
}
Some(FlightProxy::Local(true)) => {
debug!("Fetching results from scheduler: {}", scheduler_url);
split_host_port(scheduler_url)
}
Some(FlightProxy::Local(false)) | None => {
debug!(
"Fetching results from executor: {}:{}",
executor_metadata.host, executor_metadata.port
);
Ok((
executor_metadata.host.clone(),
executor_metadata.port as u16,
))
}
}
}

async fn fetch_partition(
location: PartitionLocation,
max_message_size: usize,
flight_transport: bool,
scheduler_url: String,
flight_proxy: Option<FlightProxy>,
) -> Result<SendableRecordBatchStream> {
let metadata = location.executor_meta.ok_or_else(|| {
DataFusionError::Internal("Received empty executor metadata".to_owned())
})?;

let partition_id = location.partition_id.ok_or_else(|| {
DataFusionError::Internal("Received empty partition id".to_owned())
})?;
let host = metadata.host.as_str();
let port = metadata.port as u16;
let mut ballista_client = BallistaClient::try_new(host, port, max_message_size)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

let (client_host, client_port) =
get_client_host_port(&metadata, &scheduler_url, &flight_proxy)?;

let mut ballista_client =
BallistaClient::try_new(client_host.as_str(), client_port, max_message_size)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
ballista_client
.fetch_partition(
&metadata.id,
Expand All @@ -443,3 +507,64 @@ async fn fetch_partition(
.await
.map_err(|e| DataFusionError::External(Box::new(e)))
}

#[cfg(test)]
mod test {
use crate::execution_plans::distributed_query::get_client_host_port;
use crate::serde::protobuf::ExecutorMetadata;
use crate::serde::protobuf::get_job_status_result::FlightProxy;

#[test]
fn test_client_host_port() {
let scheduler_host = "scheduler";
let scheduler_port: u16 = 5000;

let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}");
let executor = ExecutorMetadata {
id: "test".to_string(),
host: "executor".to_string(),
port: 12345,
grpc_port: 1,
specification: None,
};

// no flight proxy -> client should fetch results from executor
assert_eq!(
get_client_host_port(&executor, &scheduler_url, &None).unwrap(),
(executor.host.clone(), executor.port as u16)
);

// same, no flight proxy
assert_eq!(
get_client_host_port(
&executor,
&scheduler_url,
&Some(FlightProxy::Local(false))
)
.unwrap(),
(executor.host.clone(), executor.port as u16)
);

// embedded flight proxy on scheduler
assert_eq!(
get_client_host_port(
&executor,
&scheduler_url,
&Some(FlightProxy::Local(true))
)
.unwrap(),
(scheduler_host.to_string(), scheduler_port)
);

// external proxy
assert_eq!(
get_client_host_port(
&executor,
&scheduler_url,
&Some(FlightProxy::External("proxy:1234".to_string()))
)
.unwrap(),
("proxy".to_string(), 1234_u16)
);
}
}
12 changes: 12 additions & 0 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,18 @@ pub mod job_status {
pub struct GetJobStatusResult {
#[prost(message, optional, tag = "1")]
pub status: ::core::option::Option<JobStatus>,
#[prost(oneof = "get_job_status_result::FlightProxy", tags = "2, 3")]
pub flight_proxy: ::core::option::Option<get_job_status_result::FlightProxy>,
}
/// Nested message and enum types in `GetJobStatusResult`.
pub mod get_job_status_result {
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
pub enum FlightProxy {
#[prost(bool, tag = "2")]
Local(bool),
#[prost(string, tag = "3")]
External(::prost::alloc::string::String),
}
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct FilePartitionMetadata {
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ doc = "Print version of this executable"
[[param]]
name = "advertise_flight_sql_endpoint"
type = "String"
doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"
doc = "Route for proxying flight results via scheduler. Use 'HOST:PORT' to let clients fetch results from the specified address. If empty a flight proxy will be started on the scheduler host and port."

[[param]]
abbr = "n"
Expand Down
4 changes: 3 additions & 1 deletion ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ pub struct Config {
/// Route for proxying flight results via scheduler (IP:PORT format).
#[arg(
long,
help = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT"
num_args = 0..=1,
default_missing_value = "",
help = "Route for proxying flight results via scheduler. Use 'HOST:PORT' to let clients fetch results from the specified address. If empty a flight proxy will be started on the scheduler host and port."
)]
pub advertise_flight_sql_endpoint: Option<String>,
/// Namespace for the ballista cluster.
Expand Down
Loading