Skip to content

Commit 39697ab

Browse files
authored
Add localhost_run.rs and localhost_worker.rs examples (#111)
* Add some examples * Add markdown example * Fix examples and conflicts * Improve docs
1 parent 47bf486 commit 39697ab

File tree

8 files changed

+285
-4
lines changed

8 files changed

+285
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ integration = [
4848
]
4949

5050
[dev-dependencies]
51+
structopt = "0.3"
5152
insta = { version = "1.43.1", features = ["filters"] }
5253
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
5354
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }

examples/localhost.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Localhost workers example
2+
3+
This example executes a SQL query in a distributed context.
4+
5+
For this example to work, it's necessary to spawn some localhost workers with the `localhost_worker.rs` example:
6+
7+
## Preparation
8+
9+
This example queries a couple of test parquet we have for integration tests, and those files are stored using `git lfs`,
10+
so pulling the first is necessary.
11+
12+
```shell
13+
git lfs checkout
14+
```
15+
16+
### Spawning the workers
17+
18+
In two different terminals spawn two ArrowFlightEndpoints
19+
20+
```shell
21+
cargo run --example localhost_worker -- 8080 --cluster-ports 8080,8081
22+
```
23+
24+
```shell
25+
cargo run --example localhost_worker -- 8081 --cluster-ports 8080,8081
26+
```
27+
28+
- The positional numeric argument is the port in which each Arrow Flight endpoint will listen
29+
- The `--cluster-ports` parameter tells the Arrow Flight endpoint all the available localhost workers in the cluster
30+
31+
### Issuing a distributed SQL query
32+
33+
Now, DataFusion queries can be issued using these workers as part of the cluster.
34+
35+
```shell
36+
cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081
37+
```
38+
39+
The head stage (the one that outputs data to the user) will be executed locally in the same process as that `cargo run`
40+
command, but further stages will be delegated to the workers running on ports 8080 and 8081.
41+
42+
Additionally, the `--explain` flag can be passed to render the distributed plan:
43+
44+
```shell
45+
cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081 --explain
46+
```
47+
48+
### Available tables
49+
50+
Two tables are available in this example:
51+
52+
- `flights_1m`: Flight data with 1m rows
53+
54+
```
55+
FL_DATE [INT32]
56+
DEP_DELAY [INT32]
57+
ARR_DELAY [INT32]
58+
AIR_TIME [INT32]
59+
DISTANCE [INT32]
60+
DEP_TIME [FLOAT]
61+
ARR_TIME [FLOAT]
62+
```
63+
64+
- `weather`: Small dataset of weather data
65+
66+
```
67+
MinTemp [DOUBLE]
68+
MaxTemp [DOUBLE]
69+
Rainfall [DOUBLE]
70+
Evaporation [DOUBLE]
71+
Sunshine [BYTE_ARRAY]
72+
WindGustDir [BYTE_ARRAY]
73+
WindGustSpeed [BYTE_ARRAY]
74+
WindDir9am [BYTE_ARRAY]
75+
WindDir3pm [BYTE_ARRAY]
76+
WindSpeed9am [BYTE_ARRAY]
77+
WindSpeed3pm [INT64]
78+
Humidity9am [INT64]
79+
Humidity3pm [INT64]
80+
Pressure9am [DOUBLE]
81+
Pressure3pm [DOUBLE]
82+
Cloud9am [INT64]
83+
Cloud3pm [INT64]
84+
Temp9am [DOUBLE]
85+
Temp3pm [DOUBLE]
86+
RainToday [BYTE_ARRAY]
87+
RISK_MM [DOUBLE]
88+
RainTomorrow [BYTE_ARRAY]
89+
```

examples/localhost_run.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use arrow::util::pretty::pretty_format_batches;
2+
use async_trait::async_trait;
3+
use dashmap::{DashMap, Entry};
4+
use datafusion::common::DataFusionError;
5+
use datafusion::execution::SessionStateBuilder;
6+
use datafusion::physical_plan::displayable;
7+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
8+
use datafusion_distributed::{
9+
BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule,
10+
};
11+
use futures::TryStreamExt;
12+
use std::error::Error;
13+
use std::sync::Arc;
14+
use structopt::StructOpt;
15+
use tonic::transport::Channel;
16+
use url::Url;
17+
18+
#[derive(StructOpt)]
19+
#[structopt(name = "run", about = "A localhost Distributed DataFusion runner")]
20+
struct Args {
21+
#[structopt()]
22+
query: String,
23+
24+
// --cluster-ports 8080,8081,8082
25+
#[structopt(long = "cluster-ports", use_delimiter = true)]
26+
cluster_ports: Vec<u16>,
27+
28+
#[structopt(long)]
29+
explain: bool,
30+
}
31+
32+
#[tokio::main]
33+
async fn main() -> Result<(), Box<dyn Error>> {
34+
let args = Args::from_args();
35+
36+
let localhost_resolver = LocalhostChannelResolver {
37+
ports: args.cluster_ports,
38+
cached: DashMap::new(),
39+
};
40+
41+
let state = SessionStateBuilder::new()
42+
.with_default_features()
43+
.with_distributed_channel_resolver(localhost_resolver)
44+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
45+
.build();
46+
47+
let ctx = SessionContext::from(state);
48+
49+
ctx.register_parquet(
50+
"flights_1m",
51+
"testdata/flights-1m.parquet",
52+
ParquetReadOptions::default(),
53+
)
54+
.await?;
55+
56+
ctx.register_parquet(
57+
"weather",
58+
"testdata/weather.parquet",
59+
ParquetReadOptions::default(),
60+
)
61+
.await?;
62+
63+
let df = ctx.sql(&args.query).await?;
64+
if args.explain {
65+
let plan = df.create_physical_plan().await?;
66+
let display = displayable(plan.as_ref()).indent(true).to_string();
67+
println!("{display}");
68+
} else {
69+
let stream = df.execute_stream().await?;
70+
let batches = stream.try_collect::<Vec<_>>().await?;
71+
let formatted = pretty_format_batches(&batches)?;
72+
println!("{formatted}");
73+
}
74+
Ok(())
75+
}
76+
77+
#[derive(Clone)]
78+
struct LocalhostChannelResolver {
79+
ports: Vec<u16>,
80+
cached: DashMap<Url, BoxCloneSyncChannel>,
81+
}
82+
83+
#[async_trait]
84+
impl ChannelResolver for LocalhostChannelResolver {
85+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
86+
Ok(self
87+
.ports
88+
.iter()
89+
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
90+
.collect())
91+
}
92+
93+
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
94+
match self.cached.entry(url.clone()) {
95+
Entry::Occupied(v) => Ok(v.get().clone()),
96+
Entry::Vacant(v) => {
97+
let channel = Channel::from_shared(url.to_string())
98+
.unwrap()
99+
.connect_lazy();
100+
let channel = BoxCloneSyncChannel::new(channel);
101+
v.insert(channel.clone());
102+
Ok(channel)
103+
}
104+
}
105+
}
106+
}

examples/localhost_worker.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use arrow_flight::flight_service_server::FlightServiceServer;
2+
use async_trait::async_trait;
3+
use dashmap::{DashMap, Entry};
4+
use datafusion::common::DataFusionError;
5+
use datafusion::execution::SessionStateBuilder;
6+
use datafusion_distributed::{
7+
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
8+
DistributedSessionBuilderContext,
9+
};
10+
use std::error::Error;
11+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
12+
use structopt::StructOpt;
13+
use tonic::transport::{Channel, Server};
14+
use url::Url;
15+
16+
#[derive(StructOpt)]
17+
#[structopt(name = "localhost_worker", about = "A localhost DataFusion worker")]
18+
struct Args {
19+
#[structopt(default_value = "8080")]
20+
port: u16,
21+
22+
// --cluster-ports 8080,8081,8082
23+
#[structopt(long = "cluster-ports", use_delimiter = true)]
24+
cluster_ports: Vec<u16>,
25+
}
26+
27+
#[tokio::main]
28+
async fn main() -> Result<(), Box<dyn Error>> {
29+
let args = Args::from_args();
30+
31+
let localhost_resolver = LocalhostChannelResolver {
32+
ports: args.cluster_ports,
33+
cached: DashMap::new(),
34+
};
35+
36+
let endpoint = ArrowFlightEndpoint::try_new(move |ctx: DistributedSessionBuilderContext| {
37+
let local_host_resolver = localhost_resolver.clone();
38+
async move {
39+
Ok(SessionStateBuilder::new()
40+
.with_runtime_env(ctx.runtime_env)
41+
.with_distributed_channel_resolver(local_host_resolver)
42+
.with_default_features()
43+
.build())
44+
}
45+
})?;
46+
47+
Server::builder()
48+
.add_service(FlightServiceServer::new(endpoint))
49+
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port))
50+
.await?;
51+
52+
Ok(())
53+
}
54+
55+
#[derive(Clone)]
56+
struct LocalhostChannelResolver {
57+
ports: Vec<u16>,
58+
cached: DashMap<Url, BoxCloneSyncChannel>,
59+
}
60+
61+
#[async_trait]
62+
impl ChannelResolver for LocalhostChannelResolver {
63+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
64+
Ok(self
65+
.ports
66+
.iter()
67+
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
68+
.collect())
69+
}
70+
71+
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
72+
match self.cached.entry(url.clone()) {
73+
Entry::Occupied(v) => Ok(v.get().clone()),
74+
Entry::Vacant(v) => {
75+
let channel = Channel::from_shared(url.to_string())
76+
.unwrap()
77+
.connect_lazy();
78+
let channel = BoxCloneSyncChannel::new(channel);
79+
v.insert(channel.clone());
80+
Ok(channel)
81+
}
82+
}
83+
}
84+
}

src/flight_service/do_get.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ mod tests {
181181

182182
// Create ArrowFlightEndpoint with DefaultSessionBuilder
183183
let endpoint =
184-
ArrowFlightEndpoint::new(DefaultSessionBuilder).expect("Failed to create endpoint");
184+
ArrowFlightEndpoint::try_new(DefaultSessionBuilder).expect("Failed to create endpoint");
185185

186186
// Create 3 tasks with 3 partitions each.
187187
let num_tasks = 3;
@@ -210,7 +210,7 @@ mod tests {
210210
tasks,
211211
};
212212

213-
let task_keys = vec![
213+
let task_keys = [
214214
StageKey {
215215
query_id: query_id_uuid.to_string(),
216216
stage_id,

src/flight_service/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub struct ArrowFlightEndpoint {
3636
}
3737

3838
impl ArrowFlightEndpoint {
39-
pub fn new(
39+
pub fn try_new(
4040
session_builder: impl DistributedSessionBuilder + Send + Sync + 'static,
4141
) -> Result<Self, DataFusionError> {
4242
let ttl_map = TTLMap::try_new(TTLMapConfig::default())?;

src/test_utils/localhost.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub async fn spawn_flight_service(
109109
session_builder: impl DistributedSessionBuilder + Send + Sync + 'static,
110110
incoming: TcpListener,
111111
) -> Result<(), Box<dyn Error + Send + Sync>> {
112-
let endpoint = ArrowFlightEndpoint::new(session_builder)?;
112+
let endpoint = ArrowFlightEndpoint::try_new(session_builder)?;
113113

114114
let incoming = tokio_stream::wrappers::TcpListenerStream::new(incoming);
115115

0 commit comments

Comments
 (0)