Skip to content

Commit 9509b85

Browse files
authored
Improve docs and add custom execution plan example (#277)
* Support any dataset in benchmarks * Add TPC-DS schemas * Improve docs * Let Claude improve wording * Add custom_execution_plan.rs example * Link to custom_execution_plan.rs example * Respond to PR feedback
1 parent 1fb4daa commit 9509b85

File tree

12 files changed

+719
-162
lines changed

12 files changed

+719
-162
lines changed
Lines changed: 3 additions & 0 deletions
Loading

docs/source/index.rst

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,12 @@ how to contribute changes to the library yourself.
1313
:maxdepth: 2
1414
:caption: User Guide
1515

16-
user-guide/index
16+
user-guide/concepts
1717
user-guide/getting-started
18+
user-guide/worker
1819
user-guide/worker-resolver
19-
user-guide/arrow-flight-endpoint
2020
user-guide/channel-resolver
2121
user-guide/task-estimator
22-
user-guide/concepts
2322
user-guide/how-a-distributed-plan-is-built
2423

2524
.. _toc.contributor-guide:

docs/source/user-guide/channel-resolver.md

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,43 @@
11
# Building a ChannelResolver
22

3-
This trait is optional, there's a sane default already in place that should work for most simple use cases.
3+
This trait is optional—a sensible default implementation exists that handles most use cases.
44

5-
A `ChannelResolver` tells Distributed DataFusion how to build an Arrow Flight client baked by a
6-
[tonic](https://github.com/hyperium/tonic) channel given a worker URL.
5+
The `ChannelResolver` trait controls how Distributed DataFusion builds Arrow Flight clients backed by
6+
[Tonic](https://github.com/hyperium/tonic) channels for worker URLs.
77

8-
There's a default implementation that simply connects to the given URL, builds the Arrow Flight client instance,
9-
and caches it so that the same client instance gets reused for a query to the same URL.
8+
The default implementation connects to each URL, builds an Arrow Flight client, and caches it for reuse on
9+
subsequent requests to the same URL.
1010

11-
However, you might want to provide your own implementation. For that you need to take into account the following
12-
points:
11+
## Providing your own ChannelResolver
12+
13+
For providing your own implementation, you'll need to take into account the following points:
1314

1415
- You will need to provide your own implementation in two places:
15-
- in the `SessionContext` that handles your queries.
16+
- in the `SessionContext` that first initiates and plans your queries.
1617
- while instantiating the `Worker` with the `from_session_builder()` constructor.
17-
- If you decide to build it from scratch, make sure that Arrow Flight clients are reused across
18-
request rather than always building new ones.
19-
- You can use this library's `DefaultChannelResolver` as a backbone for your own implementation.
20-
If you do that, channel caching will be automatically managed for you.
18+
- If building from scratch, ensure Arrow Flight clients are reused across requests rather than recreated each time.
19+
- You can extend `DefaultChannelResolver` as a foundation for custom implementations. This automatically handles
20+
gRPC channel reuse.
2121

2222
```rust
2323
#[derive(Clone)]
2424
struct CustomChannelResolver;
2525

2626
#[async_trait]
2727
impl ChannelResolver for CustomChannelResolver {
28-
async fn get_flight_client_for_url(&self, url: &Url) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
29-
// Build a custom FlightServiceClient wrapped with tower layers or something similar.
28+
async fn get_flight_client_for_url(
29+
&self,
30+
url: &Url,
31+
) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
32+
// Build a custom FlightServiceClient wrapped with tower
33+
// layers or something similar.
3034
todo!()
3135
}
3236
}
3337

3438
async fn main() {
35-
// Make sure you build just one for the whole lifetime of your application, as it needs to be able to
36-
// reuse Arrow Flight client instances across different queries.
39+
// Build a single instance for your application's lifetime
40+
// to enable Arrow Flight client reuse across queries.
3741
let channel_resolver = CustomChannelResolver;
3842

3943
let state = SessionStateBuilder::new()

docs/source/user-guide/concepts.md

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,50 @@
11
# Concepts
22

3-
This library contains a set of extensions to DataFusion that allow you to run distributed queries.
3+
This library is a collection of DataFusion extensions that enable distributed query execution. You can think of
4+
it as normal DataFusion, with the addition that some nodes are capable of streaming data over the network using
5+
Arrow Flight instead of through in-memory communication.
46

5-
These are some terms you should be familiar with before getting started:
7+
Key terminology:
68

79
- `Stage`: a portion of the plan separated by a network boundary from other parts of the plan. A plan contains
8-
one or more stages.
10+
one or more stages, each separated by network boundaries.
911
- `Task`: a unit of work in a stage that executes the inner plan in parallel to other tasks within the stage. Each task
10-
in a stage is executed by a different worker.
11-
- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface.
12-
- `Network boundary`: a node in the plan that streams data from a network interface rather than directly from its
13-
children. Implemented as DataFusion `ExecutionPlan`s: `NeworkShuffle` and `NetworkCoalesce`.
14-
- `Subplan`: a slice of the overall plan. Each stage will execute a subplan of the overall plan.
12+
in a stage executes a structurally identical plan in different worker, passing a `task_index` as a contextual value
13+
for making choices about what data should be returned.
14+
- `Network Boundary`: a node in the plan that streams data from a network interface rather than directly from its
15+
children nodes.
16+
- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. A task is
17+
executed by exactly one worker, but one worker executes many tasks concurrently.
18+
19+
![concepts.png](../_static/images/concepts.png)
20+
21+
You'll see these concepts mentioned extensively across the documentation and the code itself.
22+
23+
# Public API
24+
25+
Some other more tangible concepts are the structs and traits exposed publicly, the most important are:
1526

1627
## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs)
1728

18-
This is a physical optimizer rule that converts a single-node DataFusion query into a distributed query. It reads
29+
A physical optimizer rule that transforms single-node DataFusion query plans into distributed query plans. It reads
1930
a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion.
2031

21-
It builds the distributed plan from bottom to top, and based on the present nodes in the original plan,
22-
it will inject network boundaries in the appropriate places.
32+
It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on
33+
the nodes present in the original plan.
34+
35+
## [Worker](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/flight_service/worker.rs)
36+
37+
Arrow Flight server implementation that integrates with the Tonic ecosystem and listens to serialized plans that get
38+
executed over the wire.
39+
40+
Users are expected to build these and spawn them in ports so that the network boundary nodes can reach them.
41+
42+
## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)
43+
44+
Determines the available workers in the Distributed DataFusion cluster by returning their URLs.
45+
46+
Different organizations have different networking requirements—from Kubernetes deployments to cloud provider
47+
solutions. This trait allows Distributed DataFusion to adapt to various scenarios.
2348

2449
## [TaskEstimator](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/task_estimator.rs)
2550

@@ -41,22 +66,7 @@ you are in, you might want to return a different set of data.
4166
For example, if you are on the task with index 0 of a 3-task stage, you might want to return only the first 1/3 of the
4267
data. If you are on the task with index 2, you might want to return the last 1/3 of the data, and so on.
4368

44-
## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)
45-
46-
Establishes the number of workers available in the distributed DataFusion cluster by returning their URLs.
47-
48-
Different organizations have different needs regarding networking. Some might be using Kubernetes, some other might
49-
be using a cloud provider solution, and this trait allows adapting Distributed DataFusion to the different scenarios.
50-
5169
## [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/channel_resolver.rs)
5270

5371
Optional extension trait that allows to customize how connections are established to workers. Given one of the
5472
URLs returned by the `WorkerResolver`, it builds an Arrow Flight client ready for serving queries.
55-
56-
## [NetworkBoundary](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/network_boundary.rs)
57-
58-
A network boundary is a node that, instead of pulling data from its children by executing them, serializes them
59-
and sends them over the wire so that they are executed on a remote worker.
60-
61-
As a user of distributed DataFusion, you will not need to interact with this trait directly, but you should know
62-
that different implementations of this trait will be injected into your plans so that queries get distributed.
Lines changed: 53 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,86 @@
11
# Getting Started
22

3-
Rather than being opinionated about your setup and how you serve queries to users,
4-
Distributed DataFusion allows you to plug in your own networking stack and spawn your own gRPC servers that act as
5-
workers in the cluster.
3+
Think of this library as vanilla DataFusion, except that certain nodes execute their children on remote
4+
machines and retrieve data via the Arrow Flight protocol.
5+
6+
This library aims to provide an experience as close as possible to vanilla DataFusion.
7+
8+
## How to use Distributed DataFusion
9+
10+
Rather than imposing constraints on your infrastructure or query serving patterns, Distributed DataFusion
11+
allows you to plug in your own networking stack and spawn your own gRPC servers that act as workers in the cluster.
612

713
This project heavily relies on the [Tonic](https://github.com/hyperium/tonic) ecosystem for the networking layer.
814
Users of this library are responsible for building their own Tonic server, adding the Arrow Flight distributed
9-
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster.
15+
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster. A very
16+
basic example of this would be:
1017

11-
For a basic setup, all you need to do is to enrich your DataFusion `SessionStateBuilder` with the tools this project
12-
ships:
18+
```rust
19+
#[tokio::main]
20+
async fn main() -> Result<(), Box<dyn Error>> {
21+
let worker = Worker::default();
1322

14-
```rs
15-
let state = SessionStateBuilder::new()
16-
+ .with_distributed_worker_resolver(my_custom_worker_resolver)
17-
+ .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
18-
.build();
19-
```
23+
Server::builder()
24+
.add_service(worker.into_flight_server())
25+
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
26+
.await?;
2027

21-
And the `my_custom_worker_resolver` variable should be an implementation of
22-
the [WorkerResolverResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)
23-
trait, which tells Distributed DataFusion how to connect to other workers in the cluster.
28+
Ok(())
29+
}
30+
```
2431

25-
A very basic example of such an implementation that resolves workers in the localhost machine is:
32+
Distributed DataFusion requires knowledge of worker locations. Implement the `WorkerResolver` trait to provide
33+
this information. Here is a simple example of what this would look like with localhost workers:
2634

2735
```rust
2836
#[derive(Clone)]
2937
struct LocalhostWorkerResolver {
3038
ports: Vec<u16>,
3139
}
3240

33-
#[async_trait]
34-
impl ChannelResolver for LocalhostChannelResolver {
41+
impl WorkerResolver for LocalhostWorkerResolver {
3542
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
36-
Ok(self.ports.iter().map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()).collect())
43+
Ok(self
44+
.ports
45+
.iter()
46+
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
47+
.collect())
3748
}
3849
}
3950
```
4051

41-
> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.
42-
43-
This `WorkerResolver` implementation should resolve URLs of Distributed DataFusion workers, and it's
44-
also the user of this library's responsibility to spawn a Tonic server that exposes the worker as an Arrow Flight
45-
service using Tonic.
52+
Register both the `WorkerResolver` implementation and the `DistributedPhysicalOptimizerRule` in DataFusion's
53+
`SessionStateBuilder` to enable distributed query planning:
4654

47-
A basic example of such a server is:
48-
49-
```rust
50-
#[tokio::main]
51-
async fn main() -> Result<(), Box<dyn Error>> {
52-
let endpoint = Worker::default();
53-
54-
Server::builder()
55-
.add_service(endpoint.into_flight_server())
56-
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
57-
.await?;
58-
59-
Ok(())
55+
```rs
56+
let localhost_worker_resolver = LocalhostWorkerResolver {
57+
ports: vec![8000, 8001, 8002]
6058
}
61-
```
6259

63-
## Next steps
60+
let state = SessionStateBuilder::new()
61+
.with_distributed_worker_resolver(localhost_worker_resolver)
62+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
63+
.build();
6464

65-
The next two sections of this guide will walk you through tailoring the library's traits to your own needs:
65+
let ctx = SessionContext::from(state);
66+
```
6667

67-
- [Build your own WorkerResolver](worker-resolver.md)
68-
- [Build your own ChannelResolver](channel-resolver.md)
69-
- [Build your own TaskEstimator](task-estimator.md)
70-
- [Build your own distributed DataFusion Worker](worker.md)
68+
This will leave a DataFusion `SessionContext` ready for executing distributed queries.
7169

72-
Here are some other resources in the codebase:
70+
> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.
7371
74-
- [In-memory cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/in_memory.md)
75-
- [Localhost cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/localhost.md)
72+
## Next steps
7673

77-
A more advanced example can be found in the benchmarks that use a cluster of distributed DataFusion workers
78-
deployed on AWS EC2 machines:
74+
Depending on your needs, your setup can get more complicated, for example:
7975

80-
- [AWS EC2 based cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs)
76+
- You may want to resolve worker URLs dynamically using the Kubernetes API.
77+
- You may want to wrap the Arrow Flight clients that connect workers with an observability layer.
78+
- You may want to be able to execute your own custom ExecutionPlans in a distributed manner.
79+
- etc...
8180

82-
Each feature in the project is showcased and tested in its own isolated integration test, so it's recommended to
83-
review those for a better understanding of how specific features work:
81+
To learn how to do all that, it's recommended to:
8482

85-
- [Pass your own ConfigExtension implementations across network boundaries](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_config_extension.rs)
86-
- [Provide custom protobuf codecs for your own nodes](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_extension_codec.rs)
87-
- Provide a custom TaskEstimator for controlling the amount of parallelism (coming soon)
83+
- [Continue reading this guide](worker.md)
84+
- [Look at examples in the project](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/examples)
85+
- [Look at the integration tests for finer grained examples](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/tests)
8886

docs/source/user-guide/how-a-distributed-plan-is-built.md

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# How a Distributed Plan is Built
22

3-
This page walks through the steps the distributed DataFusion planner takes to build a distributed query plan.
3+
This page walks through how the distributed DataFusion planner transforms a query into a distributed execution plan.
44

55
Everything starts with a simple single-node plan, for example:
66

@@ -29,11 +29,14 @@ The first step is to split the leaf node into different tasks:
2929
Each task will handle a different non-overlapping piece of data.
3030

3131
The number of tasks that will be used for executing leaf nodes is determined by a `TaskEstimator` implementation.
32-
There is one default implementation for a file-based `DataSourceExec`, but since a `DataSourceExec` in DataFusion can
33-
be anything the user wants to implement, it is the user who should also provide a custom `TaskEstimator` if they have
34-
a custom `DataSourceExec`.
32+
A default implementation exists for file-based `DataSourceExec` nodes. However, since `DataSourceExec` can be
33+
customized to represent any data source, users with custom implementations should also provide a corresponding
34+
`TaskEstimator`.
3535

36-
In the case above, a `TaskEstimator` decided to use four tasks for the leaf node.
36+
In the case above, a `TaskEstimator` decided to use four tasks for the leaf node. Note that even if we are distributing
37+
the data across different tasks, each task will also distribute its data across partitions using the vanilla DataFusion
38+
partitioning mechanism. A partition is a split of data processed by a single thread on a single machine, whereas a
39+
task is a split of data processed by an entire machine within a cluster.
3740

3841
After that, we can continue reconstructing the plan:
3942

@@ -46,12 +49,11 @@ Let's keep constructing the plan:
4649

4750
![img_4.png](../_static/images/img_4.png)
4851

49-
Things change at this point: a `RepartitionExec` implies that we want to repartition data so that each partition handles
50-
a non-overlapping set of the grouping keys of the ongoing aggregation.
52+
At this point, the plan encounters a `RepartitionExec` node, which requires repartitioning data so each partition
53+
handles a non-overlapping subset of grouping keys for the aggregation.
5154

52-
If a `RepartitionExec` in a single-node context redistributes data across threads on the same machine, a
53-
`RepartitionExec` in a distributed context redistributes data across threads on different machines. This means that we
54-
need to perform a shuffle over the network.
55+
While `RepartitionExec` redistributes data across threads on a single machine in vanilla DataFusion, it redistributes
56+
data across threads on different machines in the distributed context—requiring a network shuffle.
5557

5658
As we are about to send data over the network, it's convenient to coalesce smaller batches into larger ones to avoid
5759
the overhead of sending many small messages, and instead send fewer but larger messages:
@@ -80,9 +82,10 @@ The rest of the plan can be formed as normal:
8082

8183
![img_7.png](../_static/images/img_7.png)
8284

83-
There's just one last step: the head of the plan is currently spread across two different machines, but we want it
84-
on one. In the same way that vanilla DataFusion coalesces all partitions into one in the head node for the user, we also
85-
need to do that, but not only across partitions on a single machine, but across tasks on different machines.
85+
One final step remains: the plan's head is currently distributed across two machines, but the final result must be
86+
consolidated on a single node. In the same way that vanilla DataFusion coalesces all partitions into one in the head
87+
node for the user, we also need to do that, but not only across partitions on a single machine, but across tasks on
88+
different machines.
8689

8790
For that, the `NetworkCoalesceExec` network boundary is introduced: it coalesces P partitions across N tasks into
8891
N*P partitions in one task. This does not imply repartitioning, or shuffling, or anything like that. The partitions

0 commit comments

Comments
 (0)