Skip to content

Commit 43c5bc6

Browse files
committed
Add some examples
1 parent 47bf486 commit 43c5bc6

File tree

4 files changed

+192
-0
lines changed

4 files changed

+192
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ integration = [
4848
]
4949

5050
[dev-dependencies]
51+
structopt = "0.3"
5152
insta = { version = "1.43.1", features = ["filters"] }
5253
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
5354
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }

examples/localhost_run.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use arrow::util::pretty::pretty_format_batches;
2+
use async_trait::async_trait;
3+
use dashmap::{DashMap, Entry};
4+
use datafusion::common::DataFusionError;
5+
use datafusion::execution::SessionStateBuilder;
6+
use datafusion::physical_plan::displayable;
7+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
8+
use datafusion_distributed::{
9+
BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule,
10+
};
11+
use futures::TryStreamExt;
12+
use std::error::Error;
13+
use std::sync::Arc;
14+
use structopt::StructOpt;
15+
use tonic::transport::Channel;
16+
use url::Url;
17+
18+
#[derive(StructOpt)]
19+
#[structopt(name = "run", about = "A localhost Distributed DataFusion runner")]
20+
struct Args {
21+
#[structopt()]
22+
query: String,
23+
24+
// --cluster-ports 8080,8081,8082
25+
#[structopt(long = "cluster-ports", use_delimiter = true)]
26+
cluster_ports: Vec<u16>,
27+
28+
#[structopt(long)]
29+
explain: bool,
30+
}
31+
32+
#[tokio::main]
33+
async fn main() -> Result<(), Box<dyn Error>> {
34+
let args = Args::from_args();
35+
36+
let localhost_resolver = LocalhostChannelResolver {
37+
ports: args.cluster_ports,
38+
cached: DashMap::new(),
39+
};
40+
41+
let state = SessionStateBuilder::new()
42+
.with_default_features()
43+
.with_distributed_channel_resolver(localhost_resolver)
44+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
45+
.build();
46+
47+
let ctx = SessionContext::from(state);
48+
49+
ctx.register_parquet(
50+
"flights_1m",
51+
"testdata/flights-1m.parquet",
52+
ParquetReadOptions::default(),
53+
)
54+
.await?;
55+
56+
ctx.register_parquet(
57+
"weather",
58+
"testdata/weather.parquet",
59+
ParquetReadOptions::default(),
60+
)
61+
.await?;
62+
63+
let df = ctx.sql(&args.query).await?;
64+
if args.explain {
65+
let plan = df.create_physical_plan().await?;
66+
let display = displayable(plan.as_ref()).indent(true).to_string();
67+
println!("{display}");
68+
} else {
69+
let stream = df.execute_stream().await?;
70+
let batches = stream.try_collect::<Vec<_>>().await?;
71+
let formatted = pretty_format_batches(&batches)?;
72+
println!("{formatted}");
73+
}
74+
Ok(())
75+
}
76+
77+
#[derive(Clone)]
78+
struct LocalhostChannelResolver {
79+
ports: Vec<u16>,
80+
cached: DashMap<Url, BoxCloneSyncChannel>,
81+
}
82+
83+
#[async_trait]
84+
impl ChannelResolver for LocalhostChannelResolver {
85+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
86+
Ok(self
87+
.ports
88+
.iter()
89+
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
90+
.collect())
91+
}
92+
93+
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
94+
match self.cached.entry(url.clone()) {
95+
Entry::Occupied(v) => Ok(v.get().clone()),
96+
Entry::Vacant(v) => {
97+
let channel = Channel::from_shared(url.to_string())
98+
.unwrap()
99+
.connect_lazy();
100+
let channel = BoxCloneSyncChannel::new(channel);
101+
v.insert(channel.clone());
102+
Ok(channel)
103+
}
104+
}
105+
}
106+
}

examples/localhost_worker.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use arrow_flight::flight_service_server::FlightServiceServer;
2+
use async_trait::async_trait;
3+
use dashmap::{DashMap, Entry};
4+
use datafusion::common::DataFusionError;
5+
use datafusion::execution::SessionStateBuilder;
6+
use datafusion_distributed::{
7+
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
8+
DistributedSessionBuilderContext,
9+
};
10+
use std::error::Error;
11+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
12+
use structopt::StructOpt;
13+
use tonic::transport::{Channel, Server};
14+
use url::Url;
15+
16+
#[derive(StructOpt)]
17+
#[structopt(name = "localhost_worker", about = "A localhost DataFusion worker")]
18+
struct Args {
19+
#[structopt(default_value = "8080")]
20+
port: u16,
21+
22+
// --cluster-ports 8080,8081,8082
23+
#[structopt(long = "cluster-ports", use_delimiter = true)]
24+
cluster_ports: Vec<u16>,
25+
}
26+
27+
#[tokio::main]
28+
async fn main() -> Result<(), Box<dyn Error>> {
29+
let args = Args::from_args();
30+
31+
let localhost_resolver = LocalhostChannelResolver {
32+
ports: args.cluster_ports,
33+
cached: DashMap::new(),
34+
};
35+
36+
let endpoint = ArrowFlightEndpoint::new(move |ctx: DistributedSessionBuilderContext| {
37+
let local_host_resolver = localhost_resolver.clone();
38+
async move {
39+
Ok(SessionStateBuilder::new()
40+
.with_runtime_env(ctx.runtime_env)
41+
.with_distributed_channel_resolver(local_host_resolver)
42+
.with_default_features()
43+
.build())
44+
}
45+
});
46+
47+
Server::builder()
48+
.add_service(FlightServiceServer::new(endpoint))
49+
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port))
50+
.await?;
51+
52+
Ok(())
53+
}
54+
55+
#[derive(Clone)]
56+
struct LocalhostChannelResolver {
57+
ports: Vec<u16>,
58+
cached: DashMap<Url, BoxCloneSyncChannel>,
59+
}
60+
61+
#[async_trait]
62+
impl ChannelResolver for LocalhostChannelResolver {
63+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
64+
Ok(self
65+
.ports
66+
.iter()
67+
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
68+
.collect())
69+
}
70+
71+
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError> {
72+
match self.cached.entry(url.clone()) {
73+
Entry::Occupied(v) => Ok(v.get().clone()),
74+
Entry::Vacant(v) => {
75+
let channel = Channel::from_shared(url.to_string())
76+
.unwrap()
77+
.connect_lazy();
78+
let channel = BoxCloneSyncChannel::new(channel);
79+
v.insert(channel.clone());
80+
Ok(channel)
81+
}
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)