diff --git a/Cargo.lock b/Cargo.lock index bc630ab..0504aa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,6 +1130,7 @@ dependencies = [ "parquet", "prost", "rand 0.8.5", + "structopt", "tokio", "tokio-stream", "tonic", diff --git a/Cargo.toml b/Cargo.toml index aa7982a..bccc5ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ integration = [ ] [dev-dependencies] +structopt = "0.3" insta = { version = "1.43.1", features = ["filters"] } tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" } tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" } diff --git a/examples/localhost.md b/examples/localhost.md new file mode 100644 index 0000000..64e35f4 --- /dev/null +++ b/examples/localhost.md @@ -0,0 +1,89 @@ +# Localhost workers example + +This example executes a SQL query in a distributed context. + +For this example to work, it's necessary to spawn some localhost workers with the `localhost_worker.rs` example: + +## Preparation + +This example queries a couple of test parquet we have for integration tests, and those files are stored using `git lfs`, +so pulling the first is necessary. + +```shell +git lfs checkout +``` + +### Spawning the workers + +In two different terminals spawn two ArrowFlightEndpoints + +```shell +cargo run --example localhost_worker -- 8080 --cluster-ports 8080,8081 +``` + +```shell +cargo run --example localhost_worker -- 8081 --cluster-ports 8080,8081 +``` + +- The positional numeric argument is the port in which each Arrow Flight endpoint will listen +- The `--cluster-ports` parameter tells the Arrow Flight endpoint all the available localhost workers in the cluster + +### Issuing a distributed SQL query + +Now, DataFusion queries can be issued using these workers as part of the cluster. + +```shell +cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081 +``` + +The head stage (the one that outputs data to the user) will be executed locally in the same process as that `cargo run` +command, but further stages will be delegated to the workers running on ports 8080 and 8081. + +Additionally, the `--explain` flag can be passed to render the distributed plan: + +```shell +cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081 --explain +``` + +### Available tables + +Two tables are available in this example: + +- `flights_1m`: Flight data with 1m rows + +``` +FL_DATE [INT32] +DEP_DELAY [INT32] +ARR_DELAY [INT32] +AIR_TIME [INT32] +DISTANCE [INT32] +DEP_TIME [FLOAT] +ARR_TIME [FLOAT] +``` + +- `weather`: Small dataset of weather data + +``` +MinTemp [DOUBLE] +MaxTemp [DOUBLE] +Rainfall [DOUBLE] +Evaporation [DOUBLE] +Sunshine [BYTE_ARRAY] +WindGustDir [BYTE_ARRAY] +WindGustSpeed [BYTE_ARRAY] +WindDir9am [BYTE_ARRAY] +WindDir3pm [BYTE_ARRAY] +WindSpeed9am [BYTE_ARRAY] +WindSpeed3pm [INT64] +Humidity9am [INT64] +Humidity3pm [INT64] +Pressure9am [DOUBLE] +Pressure3pm [DOUBLE] +Cloud9am [INT64] +Cloud3pm [INT64] +Temp9am [DOUBLE] +Temp3pm [DOUBLE] +RainToday [BYTE_ARRAY] +RISK_MM [DOUBLE] +RainTomorrow [BYTE_ARRAY] +``` diff --git a/examples/localhost_run.rs b/examples/localhost_run.rs new file mode 100644 index 0000000..95d7ca6 --- /dev/null +++ b/examples/localhost_run.rs @@ -0,0 +1,106 @@ +use arrow::util::pretty::pretty_format_batches; +use async_trait::async_trait; +use dashmap::{DashMap, Entry}; +use datafusion::common::DataFusionError; +use datafusion::execution::SessionStateBuilder; +use datafusion::physical_plan::displayable; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_distributed::{ + BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, +}; +use futures::TryStreamExt; +use std::error::Error; +use std::sync::Arc; +use structopt::StructOpt; +use tonic::transport::Channel; +use url::Url; + +#[derive(StructOpt)] +#[structopt(name = "run", about = "A localhost Distributed DataFusion runner")] +struct Args { + #[structopt()] + query: String, + + // --cluster-ports 8080,8081,8082 + #[structopt(long = "cluster-ports", use_delimiter = true)] + cluster_ports: Vec, + + #[structopt(long)] + explain: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::from_args(); + + let localhost_resolver = LocalhostChannelResolver { + ports: args.cluster_ports, + cached: DashMap::new(), + }; + + let state = SessionStateBuilder::new() + .with_default_features() + .with_distributed_channel_resolver(localhost_resolver) + .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new())) + .build(); + + let ctx = SessionContext::from(state); + + ctx.register_parquet( + "flights_1m", + "testdata/flights-1m.parquet", + ParquetReadOptions::default(), + ) + .await?; + + ctx.register_parquet( + "weather", + "testdata/weather.parquet", + ParquetReadOptions::default(), + ) + .await?; + + let df = ctx.sql(&args.query).await?; + if args.explain { + let plan = df.create_physical_plan().await?; + let display = displayable(plan.as_ref()).indent(true).to_string(); + println!("{display}"); + } else { + let stream = df.execute_stream().await?; + let batches = stream.try_collect::>().await?; + let formatted = pretty_format_batches(&batches)?; + println!("{formatted}"); + } + Ok(()) +} + +#[derive(Clone)] +struct LocalhostChannelResolver { + ports: Vec, + cached: DashMap, +} + +#[async_trait] +impl ChannelResolver for LocalhostChannelResolver { + fn get_urls(&self) -> Result, DataFusionError> { + Ok(self + .ports + .iter() + .map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()) + .collect()) + } + + async fn get_channel_for_url(&self, url: &Url) -> Result { + match self.cached.entry(url.clone()) { + Entry::Occupied(v) => Ok(v.get().clone()), + Entry::Vacant(v) => { + let channel = Channel::from_shared(url.to_string()) + .unwrap() + .connect_lazy(); + let channel = BoxCloneSyncChannel::new(channel); + v.insert(channel.clone()); + Ok(channel) + } + } + } +} diff --git a/examples/localhost_worker.rs b/examples/localhost_worker.rs new file mode 100644 index 0000000..b2477ec --- /dev/null +++ b/examples/localhost_worker.rs @@ -0,0 +1,84 @@ +use arrow_flight::flight_service_server::FlightServiceServer; +use async_trait::async_trait; +use dashmap::{DashMap, Entry}; +use datafusion::common::DataFusionError; +use datafusion::execution::SessionStateBuilder; +use datafusion_distributed::{ + ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt, + DistributedSessionBuilderContext, +}; +use std::error::Error; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use structopt::StructOpt; +use tonic::transport::{Channel, Server}; +use url::Url; + +#[derive(StructOpt)] +#[structopt(name = "localhost_worker", about = "A localhost DataFusion worker")] +struct Args { + #[structopt(default_value = "8080")] + port: u16, + + // --cluster-ports 8080,8081,8082 + #[structopt(long = "cluster-ports", use_delimiter = true)] + cluster_ports: Vec, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::from_args(); + + let localhost_resolver = LocalhostChannelResolver { + ports: args.cluster_ports, + cached: DashMap::new(), + }; + + let endpoint = ArrowFlightEndpoint::try_new(move |ctx: DistributedSessionBuilderContext| { + let local_host_resolver = localhost_resolver.clone(); + async move { + Ok(SessionStateBuilder::new() + .with_runtime_env(ctx.runtime_env) + .with_distributed_channel_resolver(local_host_resolver) + .with_default_features() + .build()) + } + })?; + + Server::builder() + .add_service(FlightServiceServer::new(endpoint)) + .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port)) + .await?; + + Ok(()) +} + +#[derive(Clone)] +struct LocalhostChannelResolver { + ports: Vec, + cached: DashMap, +} + +#[async_trait] +impl ChannelResolver for LocalhostChannelResolver { + fn get_urls(&self) -> Result, DataFusionError> { + Ok(self + .ports + .iter() + .map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()) + .collect()) + } + + async fn get_channel_for_url(&self, url: &Url) -> Result { + match self.cached.entry(url.clone()) { + Entry::Occupied(v) => Ok(v.get().clone()), + Entry::Vacant(v) => { + let channel = Channel::from_shared(url.to_string()) + .unwrap() + .connect_lazy(); + let channel = BoxCloneSyncChannel::new(channel); + v.insert(channel.clone()); + Ok(channel) + } + } + } +} diff --git a/src/flight_service/do_get.rs b/src/flight_service/do_get.rs index 916c7f7..bda998b 100644 --- a/src/flight_service/do_get.rs +++ b/src/flight_service/do_get.rs @@ -181,7 +181,7 @@ mod tests { // Create ArrowFlightEndpoint with DefaultSessionBuilder let endpoint = - ArrowFlightEndpoint::new(DefaultSessionBuilder).expect("Failed to create endpoint"); + ArrowFlightEndpoint::try_new(DefaultSessionBuilder).expect("Failed to create endpoint"); // Create 3 tasks with 3 partitions each. let num_tasks = 3; @@ -210,7 +210,7 @@ mod tests { tasks, }; - let task_keys = vec![ + let task_keys = [ StageKey { query_id: query_id_uuid.to_string(), stage_id, diff --git a/src/flight_service/service.rs b/src/flight_service/service.rs index 510627d..3b02cb3 100644 --- a/src/flight_service/service.rs +++ b/src/flight_service/service.rs @@ -36,7 +36,7 @@ pub struct ArrowFlightEndpoint { } impl ArrowFlightEndpoint { - pub fn new( + pub fn try_new( session_builder: impl DistributedSessionBuilder + Send + Sync + 'static, ) -> Result { let ttl_map = TTLMap::try_new(TTLMapConfig::default())?; diff --git a/src/test_utils/localhost.rs b/src/test_utils/localhost.rs index 2dcda1b..5cdca71 100644 --- a/src/test_utils/localhost.rs +++ b/src/test_utils/localhost.rs @@ -109,7 +109,7 @@ pub async fn spawn_flight_service( session_builder: impl DistributedSessionBuilder + Send + Sync + 'static, incoming: TcpListener, ) -> Result<(), Box> { - let endpoint = ArrowFlightEndpoint::new(session_builder)?; + let endpoint = ArrowFlightEndpoint::try_new(session_builder)?; let incoming = tokio_stream::wrappers::TcpListenerStream::new(incoming);