diff --git a/Cargo.lock b/Cargo.lock index 0504aa7..f4ab59e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1124,6 +1124,7 @@ dependencies = [ "delegate", "futures", "http", + "hyper-util", "insta", "itertools", "object_store", diff --git a/Cargo.toml b/Cargo.toml index f1e7cd0..18d955d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82 parquet = { version = "55.2.0", optional = true } arrow = { version = "55.2.0", optional = true } tokio-stream = { version = "0.1.17", optional = true } +hyper-util = { version = "0.1.16", optional = true } [features] integration = [ @@ -45,6 +46,7 @@ integration = [ "parquet", "arrow", "tokio-stream", + "hyper-util" ] tpch = ["integration"] @@ -56,3 +58,4 @@ tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82 parquet = "55.2.0" arrow = "55.2.0" tokio-stream = "0.1.17" +hyper-util = "0.1.16" diff --git a/examples/in_memory.md b/examples/in_memory.md new file mode 100644 index 0000000..bcd8907 --- /dev/null +++ b/examples/in_memory.md @@ -0,0 +1,72 @@ +# In-memory cluster example + +This examples shows how queries can be run in a distributed context without making any +network IO for communicating between workers. + +This is specially useful for testing, as no servers need to be spawned in localhost ports, +the setup is quite easy, and the code coverage for running in this mode is the same as +running in an actual distributed cluster. + +## Preparation + +This example queries a couple of test parquet we have for integration tests, and those files are stored using `git lfs`, +so pulling the first is necessary. + +```shell +git lfs checkout +``` + +### Issuing a distributed SQL query + +```shell +cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' +``` + +Additionally, the `--explain` flag can be passed to render the distributed plan: + +```shell +cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --explain +``` + +### Available tables + +Two tables are available in this example: + +- `flights_1m`: Flight data with 1m rows + +``` +FL_DATE [INT32] +DEP_DELAY [INT32] +ARR_DELAY [INT32] +AIR_TIME [INT32] +DISTANCE [INT32] +DEP_TIME [FLOAT] +ARR_TIME [FLOAT] +``` + +- `weather`: Small dataset of weather data + +``` +MinTemp [DOUBLE] +MaxTemp [DOUBLE] +Rainfall [DOUBLE] +Evaporation [DOUBLE] +Sunshine [BYTE_ARRAY] +WindGustDir [BYTE_ARRAY] +WindGustSpeed [BYTE_ARRAY] +WindDir9am [BYTE_ARRAY] +WindDir3pm [BYTE_ARRAY] +WindSpeed9am [BYTE_ARRAY] +WindSpeed3pm [INT64] +Humidity9am [INT64] +Humidity3pm [INT64] +Pressure9am [DOUBLE] +Pressure3pm [DOUBLE] +Cloud9am [INT64] +Cloud3pm [INT64] +Temp9am [DOUBLE] +Temp3pm [DOUBLE] +RainToday [BYTE_ARRAY] +RISK_MM [DOUBLE] +RainTomorrow [BYTE_ARRAY] +``` diff --git a/examples/in_memory_cluster.rs b/examples/in_memory_cluster.rs new file mode 100644 index 0000000..01e036e --- /dev/null +++ b/examples/in_memory_cluster.rs @@ -0,0 +1,136 @@ +use arrow::util::pretty::pretty_format_batches; +use arrow_flight::flight_service_server::FlightServiceServer; +use async_trait::async_trait; +use datafusion::common::DataFusionError; +use datafusion::execution::SessionStateBuilder; +use datafusion::physical_plan::displayable; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_distributed::{ + ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt, + DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext, +}; +use futures::TryStreamExt; +use hyper_util::rt::TokioIo; +use std::error::Error; +use std::sync::Arc; +use structopt::StructOpt; +use tonic::transport::{Endpoint, Server}; + +#[derive(StructOpt)] +#[structopt( + name = "run", + about = "An in-memory cluster Distributed DataFusion runner" +)] +struct Args { + #[structopt()] + query: String, + + #[structopt(long)] + explain: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::from_args(); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_distributed_channel_resolver(InMemoryChannelResolver::new()) + .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new())) + .build(); + + let ctx = SessionContext::from(state); + + ctx.register_parquet( + "flights_1m", + "testdata/flights-1m.parquet", + ParquetReadOptions::default(), + ) + .await?; + + ctx.register_parquet( + "weather", + "testdata/weather.parquet", + ParquetReadOptions::default(), + ) + .await?; + + let df = ctx.sql(&args.query).await?; + if args.explain { + let plan = df.create_physical_plan().await?; + let display = displayable(plan.as_ref()).indent(true).to_string(); + println!("{display}"); + } else { + let stream = df.execute_stream().await?; + let batches = stream.try_collect::>().await?; + let formatted = pretty_format_batches(&batches)?; + println!("{formatted}"); + } + Ok(()) +} + +const DUMMY_URL: &str = "http://localhost:50051"; + +/// [ChannelResolver] implementation that returns gRPC clients baked by an in-memory +/// tokio duplex rather than a TCP connection. +#[derive(Clone)] +struct InMemoryChannelResolver { + channel: BoxCloneSyncChannel, +} + +impl InMemoryChannelResolver { + fn new() -> Self { + let (client, server) = tokio::io::duplex(1024 * 1024); + + let mut client = Some(client); + let channel = Endpoint::try_from(DUMMY_URL) + .expect("Invalid dummy URL for building an endpoint. This should never happen") + .connect_with_connector_lazy(tower::service_fn(move |_| { + let client = client + .take() + .expect("Client taken twice. This should never happen"); + async move { Ok::<_, std::io::Error>(TokioIo::new(client)) } + })); + + let this = Self { + channel: BoxCloneSyncChannel::new(channel), + }; + let this_clone = this.clone(); + + let endpoint = + ArrowFlightEndpoint::try_new(move |ctx: DistributedSessionBuilderContext| { + let this = this.clone(); + async move { + let builder = SessionStateBuilder::new() + .with_default_features() + .with_distributed_channel_resolver(this) + .with_runtime_env(ctx.runtime_env.clone()); + Ok(builder.build()) + } + }) + .unwrap(); + + tokio::spawn(async move { + Server::builder() + .add_service(FlightServiceServer::new(endpoint)) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) + .await + }); + + this_clone + } +} + +#[async_trait] +impl ChannelResolver for InMemoryChannelResolver { + fn get_urls(&self) -> Result, DataFusionError> { + Ok(vec![url::Url::parse(DUMMY_URL).unwrap()]) + } + + async fn get_channel_for_url( + &self, + _: &url::Url, + ) -> Result { + Ok(self.channel.clone()) + } +}