Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ integration = [
]

[dev-dependencies]
structopt = "0.3"
insta = { version = "1.43.1", features = ["filters"] }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
Expand Down
89 changes: 89 additions & 0 deletions examples/localhost.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Localhost workers example
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked out your branch and run the below commands and get query result and explain back. Super cool!


This example executes a SQL query in a distributed context.

For this example to work, it's necessary to spawn some localhost workers with the `localhost_worker.rs` example:

## Preparation

This example queries a couple of test parquet we have for integration tests, and those files are stored using `git lfs`,
so pulling the first is necessary.

```shell
git lfs checkout
```

### Spawning the workers

In two different terminals spawn two ArrowFlightEndpoints

```shell
cargo run --example localhost_worker -- 8080 --cluster-ports 8080,8081
```

```shell
cargo run --example localhost_worker -- 8081 --cluster-ports 8080,8081
```

- The positional numeric argument is the port in which each Arrow Flight endpoint will listen
- The `--cluster-ports` parameter tells the Arrow Flight endpoint all the available localhost workers in the cluster

### Issuing a distributed SQL query

Now, DataFusion queries can be issued using these workers as part of the cluster.

```shell
cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081
```

The head stage (the one that outputs data to the user) will be executed locally in the same process as that `cargo run`
command, but further stages will be delegated to the workers running on ports 8080 and 8081.

Additionally, the `--explain` flag can be passed to render the distributed plan:

```shell
cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081 --explain
```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These commands are so cool. Do you think for near future work, we are ready to work on supporting distributed-datafusion-cli defined in #4?

Maybe we add a new folder distributed-datafusion-cli similar to datafusion-cli to support this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I'm not sure what value would distributed-datafusion-cli bring on top of the normal datafusion-cli. As this is just a library for distributing queries, the concept of CLI becomes less relevant in this context.

If people anyways want to use the CLI, hopefully we can just reuse the normal datafusion-cli rather than building our own thing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing datafusion-cli is a good option as long as we provide a good way to have a default (e.g 3 workers) and easy-custom distributed settings


### Available tables

Two tables are available in this example:

- `flights_1m`: Flight data with 1m rows

```
FL_DATE [INT32]
DEP_DELAY [INT32]
ARR_DELAY [INT32]
AIR_TIME [INT32]
DISTANCE [INT32]
DEP_TIME [FLOAT]
ARR_TIME [FLOAT]
```

- `weather`: Small dataset of weather data

```
MinTemp [DOUBLE]
MaxTemp [DOUBLE]
Rainfall [DOUBLE]
Evaporation [DOUBLE]
Sunshine [BYTE_ARRAY]
WindGustDir [BYTE_ARRAY]
WindGustSpeed [BYTE_ARRAY]
WindDir9am [BYTE_ARRAY]
WindDir3pm [BYTE_ARRAY]
WindSpeed9am [BYTE_ARRAY]
WindSpeed3pm [INT64]
Humidity9am [INT64]
Humidity3pm [INT64]
Pressure9am [DOUBLE]
Pressure3pm [DOUBLE]
Cloud9am [INT64]
Cloud3pm [INT64]
Temp9am [DOUBLE]
Temp3pm [DOUBLE]
RainToday [BYTE_ARRAY]
RISK_MM [DOUBLE]
RainTomorrow [BYTE_ARRAY]
```
106 changes: 106 additions & 0 deletions examples/localhost_run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use dashmap::{DashMap, Entry};
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::displayable;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule,
};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use tonic::transport::Channel;
use url::Url;

#[derive(StructOpt)]
#[structopt(name = "run", about = "A localhost Distributed DataFusion runner")]
struct Args {
#[structopt()]
query: String,

// --cluster-ports 8080,8081,8082
#[structopt(long = "cluster-ports", use_delimiter = true)]
cluster_ports: Vec<u16>,

#[structopt(long)]
explain: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();

let localhost_resolver = LocalhostChannelResolver {
ports: args.cluster_ports,
cached: DashMap::new(),
};

let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_channel_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
.build();

let ctx = SessionContext::from(state);

ctx.register_parquet(
"flights_1m",
"testdata/flights-1m.parquet",
ParquetReadOptions::default(),
)
.await?;

ctx.register_parquet(
"weather",
"testdata/weather.parquet",
ParquetReadOptions::default(),
)
.await?;

let df = ctx.sql(&args.query).await?;
if args.explain {
let plan = df.create_physical_plan().await?;
let display = displayable(plan.as_ref()).indent(true).to_string();
println!("{display}");
} else {
let stream = df.execute_stream().await?;
let batches = stream.try_collect::<Vec<_>>().await?;
let formatted = pretty_format_batches(&batches)?;
println!("{formatted}");
}
Ok(())
}

#[derive(Clone)]
struct LocalhostChannelResolver {
ports: Vec<u16>,
cached: DashMap<Url, BoxCloneSyncChannel>,
}

#[async_trait]
impl ChannelResolver for LocalhostChannelResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
Ok(self
.ports
.iter()
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
.collect())
}

async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
match self.cached.entry(url.clone()) {
Entry::Occupied(v) => Ok(v.get().clone()),
Entry::Vacant(v) => {
let channel = Channel::from_shared(url.to_string())
.unwrap()
.connect_lazy();
let channel = BoxCloneSyncChannel::new(channel);
v.insert(channel.clone());
Ok(channel)
}
}
}
}
84 changes: 84 additions & 0 deletions examples/localhost_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use arrow_flight::flight_service_server::FlightServiceServer;
use async_trait::async_trait;
use dashmap::{DashMap, Entry};
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion_distributed::{
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
DistributedSessionBuilderContext,
};
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use structopt::StructOpt;
use tonic::transport::{Channel, Server};
use url::Url;

#[derive(StructOpt)]
#[structopt(name = "localhost_worker", about = "A localhost DataFusion worker")]
struct Args {
#[structopt(default_value = "8080")]
port: u16,

// --cluster-ports 8080,8081,8082
#[structopt(long = "cluster-ports", use_delimiter = true)]
cluster_ports: Vec<u16>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();

let localhost_resolver = LocalhostChannelResolver {
ports: args.cluster_ports,
cached: DashMap::new(),
};

let endpoint = ArrowFlightEndpoint::try_new(move |ctx: DistributedSessionBuilderContext| {
let local_host_resolver = localhost_resolver.clone();
async move {
Ok(SessionStateBuilder::new()
.with_runtime_env(ctx.runtime_env)
.with_distributed_channel_resolver(local_host_resolver)
.with_default_features()
.build())
}
})?;

Server::builder()
.add_service(FlightServiceServer::new(endpoint))
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port))
.await?;

Ok(())
}

#[derive(Clone)]
struct LocalhostChannelResolver {
ports: Vec<u16>,
cached: DashMap<Url, BoxCloneSyncChannel>,
}

#[async_trait]
impl ChannelResolver for LocalhostChannelResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
Ok(self
.ports
.iter()
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
.collect())
}

async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
match self.cached.entry(url.clone()) {
Entry::Occupied(v) => Ok(v.get().clone()),
Entry::Vacant(v) => {
let channel = Channel::from_shared(url.to_string())
.unwrap()
.connect_lazy();
let channel = BoxCloneSyncChannel::new(channel);
v.insert(channel.clone());
Ok(channel)
}
}
}
}
4 changes: 2 additions & 2 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ mod tests {

// Create ArrowFlightEndpoint with DefaultSessionBuilder
let endpoint =
ArrowFlightEndpoint::new(DefaultSessionBuilder).expect("Failed to create endpoint");
ArrowFlightEndpoint::try_new(DefaultSessionBuilder).expect("Failed to create endpoint");

// Create 3 tasks with 3 partitions each.
let num_tasks = 3;
Expand Down Expand Up @@ -210,7 +210,7 @@ mod tests {
tasks,
};

let task_keys = vec![
let task_keys = [
StageKey {
query_id: query_id_uuid.to_string(),
stage_id,
Expand Down
2 changes: 1 addition & 1 deletion src/flight_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct ArrowFlightEndpoint {
}

impl ArrowFlightEndpoint {
pub fn new(
pub fn try_new(
session_builder: impl DistributedSessionBuilder + Send + Sync + 'static,
) -> Result<Self, DataFusionError> {
let ttl_map = TTLMap::try_new(TTLMapConfig::default())?;
Expand Down
2 changes: 1 addition & 1 deletion src/test_utils/localhost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub async fn spawn_flight_service(
session_builder: impl DistributedSessionBuilder + Send + Sync + 'static,
incoming: TcpListener,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let endpoint = ArrowFlightEndpoint::new(session_builder)?;
let endpoint = ArrowFlightEndpoint::try_new(session_builder)?;

let incoming = tokio_stream::wrappers::TcpListenerStream::new(incoming);

Expand Down