Skip to content

Commit e56530a

Browse files
committed
Tailor examples and docs
1 parent b7e12af commit e56530a

File tree

13 files changed

+111
-105
lines changed

13 files changed

+111
-105
lines changed
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
# Introduction
1+
# Index
22

33
Welcome to the DataFusion Distributed contributor guide!
44

5-
## Contents
6-
75
- [Setup](setup.md) - Getting started with development
86
- [Tests](tests.md) - Running unit and integration tests
97
- [Benchmarks](benchmarks.md) - Local and remote performance benchmarks

docs/source/index.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@ DataFusion Distributed
55
DataFusion Distributed is a library that enhances `Apache DataFusion <https://datafusion.apache.org>`_ with distributed
66
capabilities.
77

8+
These docs will guide you towards using the library for building your own Distributed DataFusion cluster, and
9+
how to contribute changes to the library yourself.
10+
811
.. _toc.guide:
912
.. toctree::
1013
:maxdepth: 2
1114
:caption: User Guide
1215

1316
user-guide/index
1417
user-guide/getting-started
18+
user-guide/task-estimator
19+
user-guide/channel-resolver
1520
user-guide/concepts
1621
user-guide/how-a-distributed-plan-is-built
1722

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Building a ChannelResolver
2+
3+
> WARNING: under construction

docs/source/user-guide/concepts.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ These are some terms you should be familiar with before getting started:
1313
children. Implemented as DataFusion `ExecutionPlan`s: `NeworkShuffle` and `NetworkCoalesce`.
1414
- `Subplan`: a slice of the overall plan. Each stage will execute a subplan of the overall plan.
1515

16-
## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/6d014eaebd809bcbe676823698838b2c83d93900/src/distributed_planner/distributed_physical_optimizer_rule.rs#L60)
16+
## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs)
1717

1818
This is a physical optimizer rule that converts a single-node DataFusion query into a distributed query. It reads
1919
a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion.
2020

2121
It builds the distributed plan from bottom to top, and based on the present nodes in the original plan,
2222
it will inject network boundaries in the appropriate places.
2323

24-
## [TaskEstimator](https://github.com/datafusion-contrib/datafusion-distributed/blob/6d014eaebd809bcbe676823698838b2c83d93900/src/distributed_planner/task_estimator.rs#L40-L40)
24+
## [TaskEstimator](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/task_estimator.rs)
2525

2626
Estimates the number of tasks required in the leaf stage of a distributed query.
2727

@@ -30,7 +30,7 @@ tasks they need to execute based on the amount of data their leaf nodes will pul
3030
will have their number of tasks reduced or increased depending on how much the cardinality of the data was reduced in
3131
previous stages.
3232

33-
## [DistributedTaskContext](https://github.com/datafusion-contrib/datafusion-distributed/blob/6d014eaebd809bcbe676823698838b2c83d93900/src/stage.rs#L137-L137)
33+
## [DistributedTaskContext](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/stage.rs)
3434

3535
An extension present during the `ExecutionPlan::execute()` that contains information about the current task in
3636
which the plan is being executed.
@@ -41,15 +41,15 @@ you are in, you might want to return a different set of data.
4141
For example, if you are on the task with index 0 of a 3-task stage, you might want to return only the first 1/3 of the
4242
data. If you are on the task with index 2, you might want to return the last 1/3 of the data, and so on.
4343

44-
## [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/6d014eaebd809bcbe676823698838b2c83d93900/src/channel_resolver_ext.rs#L57-L57)
44+
## [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/channel_resolver_ext.rs)
4545

4646
Establishes the number of workers available in the distributed DataFusion cluster, their URLs, and how to connect
4747
to them.
4848

4949
Each organization does networking differently, so this extension allows you to plug in a custom networking
5050
implementation that caters to your organization's needs.
5151

52-
## [NetworkBoundary](https://github.com/datafusion-contrib/datafusion-distributed/blob/6d014eaebd809bcbe676823698838b2c83d93900/src/distributed_planner/network_boundary.rs#L23-L23)
52+
## [NetworkBoundary](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/network_boundary.rs)
5353

5454
A network boundary is a node that, instead of pulling data from its children by executing them, serializes them
5555
and sends them over the wire so that they are executed on a remote worker.

docs/source/user-guide/getting-started.md

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,71 @@
11
# Getting Started
22

33
Rather than being opinionated about your setup and how you serve queries to users,
4-
Distributed DataFusion allows you to plug in your own networking stack and spawn your own gRPC servers that act as workers in the cluster.
4+
Distributed DataFusion allows you to plug in your own networking stack and spawn your own gRPC servers that act as
5+
workers in the cluster.
56

67
This project heavily relies on the [Tonic](https://github.com/hyperium/tonic) ecosystem for the networking layer.
78
Users of this library are responsible for building their own Tonic server, adding the Arrow Flight distributed
8-
DataFusion service to it, and spawning it on a port so that it can be reached by other workers in the cluster.
9+
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster.
910

10-
The best way to get started is to check out the available examples:
11+
For a basic setup, all you need to do is to enrich your DataFusion `SessionStateBuilder` with the tools this project
12+
ships:
13+
14+
```rs
15+
let state = SessionStateBuilder::new()
16+
+ .with_distributed_channel_resolver(my_custom_channel_resolver)
17+
+ .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
18+
.build();
19+
```
20+
21+
And the `my_custom_channel_resolver` variable should be an implementation of
22+
the [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/6d014eaebd809bcbe676823698838b2c83d93900/src/channel_resolver_ext.rs#L57-L57)
23+
trait, which tells Distributed DataFusion how to connect to other workers in the cluster.
24+
25+
A very basic example of such an implementation that resolves workers in the localhost machine is:
26+
27+
```rust
28+
#[derive(Clone)]
29+
struct LocalhostChannelResolver {
30+
ports: Vec<u16>,
31+
cached: DashMap<Url, FlightServiceClient<BoxCloneSyncChannel>>,
32+
}
33+
34+
#[async_trait]
35+
impl ChannelResolver for LocalhostChannelResolver {
36+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
37+
Ok(self.ports.iter().map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()).collect())
38+
}
39+
40+
async fn get_flight_client_for_url(
41+
&self,
42+
url: &Url,
43+
) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
44+
match self.cached.entry(url.clone()) {
45+
Entry::Occupied(v) => Ok(v.get().clone()),
46+
Entry::Vacant(v) => {
47+
let channel = Channel::from_shared(url.to_string())
48+
.unwrap()
49+
.connect_lazy();
50+
let channel = FlightServiceClient::new(BoxCloneSyncChannel::new(channel));
51+
v.insert(channel.clone());
52+
Ok(channel)
53+
}
54+
}
55+
}
56+
}
57+
```
58+
59+
> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.
60+
61+
## Next steps
62+
63+
The next two sections of this guide will walk you through tailoring the library's traits to your own needs:
64+
65+
- [Build your own ChannelResolver](channel-resolver.md)
66+
- [Build your own TaskEstimator](task-estimator.md)
67+
68+
Here are some other resources in the codebase:
1169

1270
- [In-memory cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/in_memory.md)
1371
- [Localhost cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/localhost.md)

docs/source/user-guide/index.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Introduction
1+
# Index
22

33
Distributed DataFusion is a library that brings distributed capabilities to DataFusion.
44
It provides a set of execution plans, optimization rules, configuration extensions, and new traits
@@ -7,8 +7,8 @@ to enable distributed execution.
77
This user guide will walk you through using the tools in this project to set up
88
your own distributed DataFusion cluster.
99

10-
## Concepts
11-
1210
- [Concepts](concepts.md)
1311
- [Getting Started](getting-started.md)
12+
- [Building a ChannelResolver](channel-resolver.md)
13+
- [Building a TaskEstimator](task-estimator.md)
1414
- [How a distributed plan is built](how-a-distributed-plan-is-built.md)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Building a TaskEstimator
2+
3+
> WARNING: under construction

examples/in_memory.md

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# In-memory cluster example
22

3-
This examples shows how queries can be run in a distributed context without making any
3+
This example shows how queries can be run in a distributed context without making any
44
network IO for communicating between workers.
55

6-
This is specially useful for testing, as no servers need to be spawned in localhost ports,
6+
This is especially useful for testing, as no servers need to be spawned in localhost ports,
77
the setup is quite easy, and the code coverage for running in this mode is the same as
88
running in an actual distributed cluster.
99

@@ -19,32 +19,20 @@ git lfs checkout
1919

2020
### Issuing a distributed SQL query
2121

22+
The `--show-distributed-plan` flag can be passed to render the distributed plan:
23+
2224
```shell
23-
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"'
25+
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --show-distributed-plan
2426
```
2527

26-
Additionally, the `--explain` flag can be passed to render the distributed plan:
28+
Not passing the flag will execute the query:
2729

2830
```shell
29-
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --explain
31+
cargo run --example in_memory_cluster -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"'
3032
```
3133

3234
### Available tables
3335

34-
Two tables are available in this example:
35-
36-
- `flights_1m`: Flight data with 1m rows
37-
38-
```
39-
FL_DATE [INT32]
40-
DEP_DELAY [INT32]
41-
ARR_DELAY [INT32]
42-
AIR_TIME [INT32]
43-
DISTANCE [INT32]
44-
DEP_TIME [FLOAT]
45-
ARR_TIME [FLOAT]
46-
```
47-
4836
- `weather`: Small dataset of weather data
4937

5038
```

examples/in_memory_cluster.rs

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ use arrow::util::pretty::pretty_format_batches;
22
use arrow_flight::flight_service_client::FlightServiceClient;
33
use async_trait::async_trait;
44
use datafusion::common::DataFusionError;
5-
use datafusion::common::utils::get_available_parallelism;
65
use datafusion::execution::SessionStateBuilder;
7-
use datafusion::physical_plan::displayable;
86
use datafusion::prelude::{ParquetReadOptions, SessionContext};
97
use datafusion_distributed::{
108
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
119
DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext, create_flight_client,
10+
display_plan_ascii,
1211
};
1312
use futures::TryStreamExt;
1413
use hyper_util::rt::TokioIo;
@@ -20,20 +19,16 @@ use tonic::transport::{Endpoint, Server};
2019
#[derive(StructOpt)]
2120
#[structopt(
2221
name = "run",
23-
about = "An in-memory cluster Distributed DataFusion runner"
22+
about = "Run a query in an in-memory Distributed DataFusion cluster"
2423
)]
2524
struct Args {
25+
/// The SQL query to run.
2626
#[structopt()]
2727
query: String,
2828

29+
/// Whether the distributed plan should be rendered instead of executing the query.
2930
#[structopt(long)]
30-
explain: bool,
31-
32-
#[structopt(long)]
33-
files_per_task: Option<usize>,
34-
35-
#[structopt(long)]
36-
cardinality_task_sf: Option<f64>,
31+
show_distributed_plan: bool,
3732
}
3833

3934
#[tokio::main]
@@ -44,31 +39,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
4439
.with_default_features()
4540
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
4641
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
47-
.with_distributed_files_per_task(
48-
args.files_per_task.unwrap_or(get_available_parallelism()),
49-
)?
50-
.with_distributed_cardinality_effect_task_scale_factor(
51-
args.cardinality_task_sf.unwrap_or(1.),
52-
)?
42+
.with_distributed_files_per_task(1)?
5343
.build();
5444

5545
let ctx = SessionContext::from(state);
5646

57-
ctx.register_parquet(
58-
"flights_1m",
59-
"testdata/flights-1m.parquet",
60-
ParquetReadOptions::default(),
61-
)
62-
.await?;
63-
6447
ctx.register_parquet("weather", "testdata/weather", ParquetReadOptions::default())
6548
.await?;
6649

6750
let df = ctx.sql(&args.query).await?;
68-
if args.explain {
51+
if args.show_distributed_plan {
6952
let plan = df.create_physical_plan().await?;
70-
let display = displayable(plan.as_ref()).indent(true).to_string();
71-
println!("{display}");
53+
println!("{}", display_plan_ascii(plan.as_ref(), false));
7254
} else {
7355
let stream = df.execute_stream().await?;
7456
let batches = stream.try_collect::<Vec<_>>().await?;
@@ -133,7 +115,7 @@ impl InMemoryChannelResolver {
133115
#[async_trait]
134116
impl ChannelResolver for InMemoryChannelResolver {
135117
fn get_urls(&self) -> Result<Vec<url::Url>, DataFusionError> {
136-
Ok(vec![url::Url::parse(DUMMY_URL).unwrap()])
118+
Ok(vec![url::Url::parse(DUMMY_URL).unwrap(); 16]) // simulate 16 workers.
137119
}
138120

139121
async fn get_flight_client_for_url(

examples/localhost.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ git lfs checkout
1919
In two different terminals spawn two ArrowFlightEndpoints
2020

2121
```shell
22-
cargo run --example localhost_worker -- 8080 --cluster-ports 8080,8081
22+
cargo run --example localhost_worker -- 8080
2323
```
2424

2525
```shell
26-
cargo run --example localhost_worker -- 8081 --cluster-ports 8080,8081
26+
cargo run --example localhost_worker -- 8081
2727
```
2828

29-
- The positional numeric argument is the port in which each Arrow Flight endpoint will listen
30-
- The `--cluster-ports` parameter tells the Arrow Flight endpoint all the available localhost workers in the cluster
29+
The positional numeric argument is the port in which each Arrow Flight endpoint will listen to.
3130

3231
### Issuing a distributed SQL query
3332

@@ -43,7 +42,7 @@ command, but further stages will be delegated to the workers running on ports 80
4342
Additionally, the `--explain` flag can be passed to render the distributed plan:
4443

4544
```shell
46-
cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081 --explain
45+
cargo run --example localhost_run -- 'SELECT count(*), "MinTemp" FROM weather GROUP BY "MinTemp"' --cluster-ports 8080,8081 --show-distributed-plan
4746
```
4847

4948
### Available tables

0 commit comments

Comments
 (0)