Skip to content

Commit 224370e

Browse files
authored
Adds in-memory example (#132)
1 parent bad700d commit 224370e

File tree

4 files changed

+212
-0
lines changed

4 files changed

+212
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82
3636
parquet = { version = "55.2.0", optional = true }
3737
arrow = { version = "55.2.0", optional = true }
3838
tokio-stream = { version = "0.1.17", optional = true }
39+
hyper-util = { version = "0.1.16", optional = true }
3940

4041
[features]
4142
integration = [
@@ -45,6 +46,7 @@ integration = [
4546
"parquet",
4647
"arrow",
4748
"tokio-stream",
49+
"hyper-util"
4850
]
4951
tpch = ["integration"]
5052

@@ -56,3 +58,4 @@ tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82
5658
parquet = "55.2.0"
5759
arrow = "55.2.0"
5860
tokio-stream = "0.1.17"
61+
hyper-util = "0.1.16"

examples/in_memory.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# In-memory cluster example
2+
3+
This examples shows how queries can be run in a distributed context without making any
4+
network IO for communicating between workers.
5+
6+
This is specially useful for testing, as no servers need to be spawned in localhost ports,
7+
the setup is quite easy, and the code coverage for running in this mode is the same as
8+
running in an actual distributed cluster.
9+
10+
## Preparation
11+
12+
This example queries a couple of test parquet we have for integration tests, and those files are stored using `git lfs`,
13+
so pulling the first is necessary.
14+
15+
```shell
16+
git lfs checkout
17+
```
18+
19+
### Issuing a distributed SQL query
20+
21+
```shell
22+
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"'
23+
```
24+
25+
Additionally, the `--explain` flag can be passed to render the distributed plan:
26+
27+
```shell
28+
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --explain
29+
```
30+
31+
### Available tables
32+
33+
Two tables are available in this example:
34+
35+
- `flights_1m`: Flight data with 1m rows
36+
37+
```
38+
FL_DATE [INT32]
39+
DEP_DELAY [INT32]
40+
ARR_DELAY [INT32]
41+
AIR_TIME [INT32]
42+
DISTANCE [INT32]
43+
DEP_TIME [FLOAT]
44+
ARR_TIME [FLOAT]
45+
```
46+
47+
- `weather`: Small dataset of weather data
48+
49+
```
50+
MinTemp [DOUBLE]
51+
MaxTemp [DOUBLE]
52+
Rainfall [DOUBLE]
53+
Evaporation [DOUBLE]
54+
Sunshine [BYTE_ARRAY]
55+
WindGustDir [BYTE_ARRAY]
56+
WindGustSpeed [BYTE_ARRAY]
57+
WindDir9am [BYTE_ARRAY]
58+
WindDir3pm [BYTE_ARRAY]
59+
WindSpeed9am [BYTE_ARRAY]
60+
WindSpeed3pm [INT64]
61+
Humidity9am [INT64]
62+
Humidity3pm [INT64]
63+
Pressure9am [DOUBLE]
64+
Pressure3pm [DOUBLE]
65+
Cloud9am [INT64]
66+
Cloud3pm [INT64]
67+
Temp9am [DOUBLE]
68+
Temp3pm [DOUBLE]
69+
RainToday [BYTE_ARRAY]
70+
RISK_MM [DOUBLE]
71+
RainTomorrow [BYTE_ARRAY]
72+
```

examples/in_memory_cluster.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use arrow::util::pretty::pretty_format_batches;
2+
use arrow_flight::flight_service_server::FlightServiceServer;
3+
use async_trait::async_trait;
4+
use datafusion::common::DataFusionError;
5+
use datafusion::execution::SessionStateBuilder;
6+
use datafusion::physical_plan::displayable;
7+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
8+
use datafusion_distributed::{
9+
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
10+
DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext,
11+
};
12+
use futures::TryStreamExt;
13+
use hyper_util::rt::TokioIo;
14+
use std::error::Error;
15+
use std::sync::Arc;
16+
use structopt::StructOpt;
17+
use tonic::transport::{Endpoint, Server};
18+
19+
#[derive(StructOpt)]
20+
#[structopt(
21+
name = "run",
22+
about = "An in-memory cluster Distributed DataFusion runner"
23+
)]
24+
struct Args {
25+
#[structopt()]
26+
query: String,
27+
28+
#[structopt(long)]
29+
explain: bool,
30+
}
31+
32+
#[tokio::main]
33+
async fn main() -> Result<(), Box<dyn Error>> {
34+
let args = Args::from_args();
35+
36+
let state = SessionStateBuilder::new()
37+
.with_default_features()
38+
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
39+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
40+
.build();
41+
42+
let ctx = SessionContext::from(state);
43+
44+
ctx.register_parquet(
45+
"flights_1m",
46+
"testdata/flights-1m.parquet",
47+
ParquetReadOptions::default(),
48+
)
49+
.await?;
50+
51+
ctx.register_parquet(
52+
"weather",
53+
"testdata/weather.parquet",
54+
ParquetReadOptions::default(),
55+
)
56+
.await?;
57+
58+
let df = ctx.sql(&args.query).await?;
59+
if args.explain {
60+
let plan = df.create_physical_plan().await?;
61+
let display = displayable(plan.as_ref()).indent(true).to_string();
62+
println!("{display}");
63+
} else {
64+
let stream = df.execute_stream().await?;
65+
let batches = stream.try_collect::<Vec<_>>().await?;
66+
let formatted = pretty_format_batches(&batches)?;
67+
println!("{formatted}");
68+
}
69+
Ok(())
70+
}
71+
72+
const DUMMY_URL: &str = "http://localhost:50051";
73+
74+
/// [ChannelResolver] implementation that returns gRPC clients baked by an in-memory
75+
/// tokio duplex rather than a TCP connection.
76+
#[derive(Clone)]
77+
struct InMemoryChannelResolver {
78+
channel: BoxCloneSyncChannel,
79+
}
80+
81+
impl InMemoryChannelResolver {
82+
fn new() -> Self {
83+
let (client, server) = tokio::io::duplex(1024 * 1024);
84+
85+
let mut client = Some(client);
86+
let channel = Endpoint::try_from(DUMMY_URL)
87+
.expect("Invalid dummy URL for building an endpoint. This should never happen")
88+
.connect_with_connector_lazy(tower::service_fn(move |_| {
89+
let client = client
90+
.take()
91+
.expect("Client taken twice. This should never happen");
92+
async move { Ok::<_, std::io::Error>(TokioIo::new(client)) }
93+
}));
94+
95+
let this = Self {
96+
channel: BoxCloneSyncChannel::new(channel),
97+
};
98+
let this_clone = this.clone();
99+
100+
let endpoint =
101+
ArrowFlightEndpoint::try_new(move |ctx: DistributedSessionBuilderContext| {
102+
let this = this.clone();
103+
async move {
104+
let builder = SessionStateBuilder::new()
105+
.with_default_features()
106+
.with_distributed_channel_resolver(this)
107+
.with_runtime_env(ctx.runtime_env.clone());
108+
Ok(builder.build())
109+
}
110+
})
111+
.unwrap();
112+
113+
tokio::spawn(async move {
114+
Server::builder()
115+
.add_service(FlightServiceServer::new(endpoint))
116+
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
117+
.await
118+
});
119+
120+
this_clone
121+
}
122+
}
123+
124+
#[async_trait]
125+
impl ChannelResolver for InMemoryChannelResolver {
126+
fn get_urls(&self) -> Result<Vec<url::Url>, DataFusionError> {
127+
Ok(vec![url::Url::parse(DUMMY_URL).unwrap()])
128+
}
129+
130+
async fn get_channel_for_url(
131+
&self,
132+
_: &url::Url,
133+
) -> Result<BoxCloneSyncChannel, DataFusionError> {
134+
Ok(self.channel.clone())
135+
}
136+
}

0 commit comments

Comments
 (0)