Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82
parquet = { version = "55.2.0", optional = true }
arrow = { version = "55.2.0", optional = true }
tokio-stream = { version = "0.1.17", optional = true }
hyper-util = { version = "0.1.16", optional = true }

[features]
integration = [
Expand All @@ -45,6 +46,7 @@ integration = [
"parquet",
"arrow",
"tokio-stream",
"hyper-util"
]
tpch = ["integration"]

Expand All @@ -56,3 +58,4 @@ tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82
parquet = "55.2.0"
arrow = "55.2.0"
tokio-stream = "0.1.17"
hyper-util = "0.1.16"
72 changes: 72 additions & 0 deletions examples/in_memory.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# In-memory cluster example

This examples shows how queries can be run in a distributed context without making any
network IO for communicating between workers.

This is specially useful for testing, as no servers need to be spawned in localhost ports,
the setup is quite easy, and the code coverage for running in this mode is the same as
running in an actual distributed cluster.

## Preparation

This example queries a couple of test parquet we have for integration tests, and those files are stored using `git lfs`,
so pulling the first is necessary.

```shell
git lfs checkout
```

### Issuing a distributed SQL query

```shell
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"'
```

Additionally, the `--explain` flag can be passed to render the distributed plan:

```shell
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --explain
```

### Available tables

Two tables are available in this example:

- `flights_1m`: Flight data with 1m rows

```
FL_DATE [INT32]
DEP_DELAY [INT32]
ARR_DELAY [INT32]
AIR_TIME [INT32]
DISTANCE [INT32]
DEP_TIME [FLOAT]
ARR_TIME [FLOAT]
```

- `weather`: Small dataset of weather data

```
MinTemp [DOUBLE]
MaxTemp [DOUBLE]
Rainfall [DOUBLE]
Evaporation [DOUBLE]
Sunshine [BYTE_ARRAY]
WindGustDir [BYTE_ARRAY]
WindGustSpeed [BYTE_ARRAY]
WindDir9am [BYTE_ARRAY]
WindDir3pm [BYTE_ARRAY]
WindSpeed9am [BYTE_ARRAY]
WindSpeed3pm [INT64]
Humidity9am [INT64]
Humidity3pm [INT64]
Pressure9am [DOUBLE]
Pressure3pm [DOUBLE]
Cloud9am [INT64]
Cloud3pm [INT64]
Temp9am [DOUBLE]
Temp3pm [DOUBLE]
RainToday [BYTE_ARRAY]
RISK_MM [DOUBLE]
RainTomorrow [BYTE_ARRAY]
```
136 changes: 136 additions & 0 deletions examples/in_memory_cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use arrow::util::pretty::pretty_format_batches;
use arrow_flight::flight_service_server::FlightServiceServer;
use async_trait::async_trait;
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::displayable;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext,
};
use futures::TryStreamExt;
use hyper_util::rt::TokioIo;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use tonic::transport::{Endpoint, Server};

#[derive(StructOpt)]
#[structopt(
name = "run",
about = "An in-memory cluster Distributed DataFusion runner"
)]
struct Args {
#[structopt()]
query: String,

#[structopt(long)]
explain: bool,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();

let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
.build();

let ctx = SessionContext::from(state);

ctx.register_parquet(
"flights_1m",
"testdata/flights-1m.parquet",
ParquetReadOptions::default(),
)
.await?;

ctx.register_parquet(
"weather",
"testdata/weather.parquet",
ParquetReadOptions::default(),
)
.await?;

let df = ctx.sql(&args.query).await?;
if args.explain {
let plan = df.create_physical_plan().await?;
let display = displayable(plan.as_ref()).indent(true).to_string();
println!("{display}");
} else {
let stream = df.execute_stream().await?;
let batches = stream.try_collect::<Vec<_>>().await?;
let formatted = pretty_format_batches(&batches)?;
println!("{formatted}");
}
Ok(())
}

const DUMMY_URL: &str = "http://localhost:50051";

/// [ChannelResolver] implementation that returns gRPC clients baked by an in-memory
/// tokio duplex rather than a TCP connection.
#[derive(Clone)]
struct InMemoryChannelResolver {
channel: BoxCloneSyncChannel,
}

impl InMemoryChannelResolver {
fn new() -> Self {
let (client, server) = tokio::io::duplex(1024 * 1024);

let mut client = Some(client);
let channel = Endpoint::try_from(DUMMY_URL)
.expect("Invalid dummy URL for building an endpoint. This should never happen")
.connect_with_connector_lazy(tower::service_fn(move |_| {
let client = client
.take()
.expect("Client taken twice. This should never happen");
async move { Ok::<_, std::io::Error>(TokioIo::new(client)) }
}));

let this = Self {
channel: BoxCloneSyncChannel::new(channel),
};
let this_clone = this.clone();

let endpoint =
ArrowFlightEndpoint::try_new(move |ctx: DistributedSessionBuilderContext| {
let this = this.clone();
async move {
let builder = SessionStateBuilder::new()
.with_default_features()
.with_distributed_channel_resolver(this)
.with_runtime_env(ctx.runtime_env.clone());
Ok(builder.build())
}
})
.unwrap();

tokio::spawn(async move {
Server::builder()
.add_service(FlightServiceServer::new(endpoint))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
});

this_clone
}
}

#[async_trait]
impl ChannelResolver for InMemoryChannelResolver {
fn get_urls(&self) -> Result<Vec<url::Url>, DataFusionError> {
Ok(vec![url::Url::parse(DUMMY_URL).unwrap()])
}

async fn get_channel_for_url(
&self,
_: &url::Url,
) -> Result<BoxCloneSyncChannel, DataFusionError> {
Ok(self.channel.clone())
}
}
Loading