Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/source/_static/images/concepts.png
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is NetworkCoalesceExec at the top not considered a stage? Do stage indices start at 0 or 1? It could possibly be helpful to make that explicit by writing e.g. Stage i where i is the stage index rather than just stage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is NetworkCoalesceExec at the top not considered a stage?

I don't think so. That part of the plan gets executed locally in a non-distributed context, so we usually talk about it as the "head" of the plan. It might be nice to choose a specific term for that though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For stage enumeration, I'd not over index to much about it in this drawing, as I wanted to land it as clean as possible. There's not a strong enumeration order requirement, fwiw stage numbers could be just random numbers and it would work the same, so I think it's fine to not represent it in this drawing

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 2 additions & 3 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ how to contribute changes to the library yourself.
:maxdepth: 2
:caption: User Guide

user-guide/index
user-guide/concepts
user-guide/getting-started
user-guide/worker
user-guide/worker-resolver
user-guide/arrow-flight-endpoint
user-guide/channel-resolver
user-guide/task-estimator
user-guide/concepts
user-guide/how-a-distributed-plan-is-built

.. _toc.contributor-guide:
Expand Down
36 changes: 20 additions & 16 deletions docs/source/user-guide/channel-resolver.md
Original file line number Diff line number Diff line change
@@ -1,39 +1,43 @@
# Building a ChannelResolver

This trait is optional, there's a sane default already in place that should work for most simple use cases.
This trait is optional—a sensible default implementation exists that handles most use cases.

A `ChannelResolver` tells Distributed DataFusion how to build an Arrow Flight client baked by a
[tonic](https://github.com/hyperium/tonic) channel given a worker URL.
The `ChannelResolver` trait controls how Distributed DataFusion builds Arrow Flight clients backed by
[Tonic](https://github.com/hyperium/tonic) channels for worker URLs.

There's a default implementation that simply connects to the given URL, builds the Arrow Flight client instance,
and caches it so that the same client instance gets reused for a query to the same URL.
The default implementation connects to each URL, builds an Arrow Flight client, and caches it for reuse on
subsequent requests to the same URL.

However, you might want to provide your own implementation. For that you need to take into account the following
points:
## Providing your own ChannelResolver

For providing your own implementation, you'll need to take into account the following points:

- You will need to provide your own implementation in two places:
- in the `SessionContext` that handles your queries.
- in the `SessionContext` that first initiates and plans your queries.
- while instantiating the `Worker` with the `from_session_builder()` constructor.
- If you decide to build it from scratch, make sure that Arrow Flight clients are reused across
request rather than always building new ones.
- You can use this library's `DefaultChannelResolver` as a backbone for your own implementation.
If you do that, channel caching will be automatically managed for you.
- If building from scratch, ensure Arrow Flight clients are reused across requests rather than recreated each time.
- You can extend `DefaultChannelResolver` as a foundation for custom implementations. This automatically handles
gRPC channel reuse.

```rust
#[derive(Clone)]
struct CustomChannelResolver;

#[async_trait]
impl ChannelResolver for CustomChannelResolver {
async fn get_flight_client_for_url(&self, url: &Url) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
// Build a custom FlightServiceClient wrapped with tower layers or something similar.
async fn get_flight_client_for_url(
&self,
url: &Url,
) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
// Build a custom FlightServiceClient wrapped with tower
// layers or something similar.
todo!()
}
}

async fn main() {
// Make sure you build just one for the whole lifetime of your application, as it needs to be able to
// reuse Arrow Flight client instances across different queries.
// Build a single instance for your application's lifetime
// to enable Arrow Flight client reuse across queries.
let channel_resolver = CustomChannelResolver;

let state = SessionStateBuilder::new()
Expand Down
62 changes: 36 additions & 26 deletions docs/source/user-guide/concepts.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
# Concepts

This library contains a set of extensions to DataFusion that allow you to run distributed queries.
This library is a collection of DataFusion extensions that enable distributed query execution. You can think of
it as normal DataFusion, with the addition that some nodes are capable of streaming data over the network using
Arrow Flight instead of through in-memory communication.

These are some terms you should be familiar with before getting started:
Key terminology:

- `Stage`: a portion of the plan separated by a network boundary from other parts of the plan. A plan contains
one or more stages.
one or more stages, each separated by network boundaries.
- `Task`: a unit of work in a stage that executes the inner plan in parallel to other tasks within the stage. Each task
in a stage is executed by a different worker.
- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface.
- `Network boundary`: a node in the plan that streams data from a network interface rather than directly from its
children. Implemented as DataFusion `ExecutionPlan`s: `NeworkShuffle` and `NetworkCoalesce`.
- `Subplan`: a slice of the overall plan. Each stage will execute a subplan of the overall plan.
in a stage executes a structurally identical plan in different worker, passing a `task_index` as a contextual value
for making choices about what data should be returned.
- `Network Boundary`: a node in the plan that streams data from a network interface rather than directly from its
children nodes.
- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. A task is
executed by exactly one worker, but one worker executes many tasks concurrently.

![concepts.png](../_static/images/concepts.png)

You'll see these concepts mentioned extensively across the documentation and the code itself.

# Public API

Some other more tangible concepts are the structs and traits exposed publicly, the most important are:

## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs)

This is a physical optimizer rule that converts a single-node DataFusion query into a distributed query. It reads
A physical optimizer rule that transforms single-node DataFusion query plans into distributed query plans. It reads
a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion.

It builds the distributed plan from bottom to top, and based on the present nodes in the original plan,
it will inject network boundaries in the appropriate places.
It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on
the nodes present in the original plan.

## [Worker](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/flight_service/worker.rs)

Arrow Flight server implementation that integrates with the Tonic ecosystem and listens to serialized plans that get
executed over the wire.

Users are expected to build these and spawn them in ports so that the network boundary nodes can reach them.

## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)

Determines the available workers in the Distributed DataFusion cluster by returning their URLs.

Different organizations have different networking requirements—from Kubernetes deployments to cloud provider
solutions. This trait allows Distributed DataFusion to adapt to various scenarios.

## [TaskEstimator](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/task_estimator.rs)

Expand All @@ -41,22 +66,7 @@ you are in, you might want to return a different set of data.
For example, if you are on the task with index 0 of a 3-task stage, you might want to return only the first 1/3 of the
data. If you are on the task with index 2, you might want to return the last 1/3 of the data, and so on.

## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)

Establishes the number of workers available in the distributed DataFusion cluster by returning their URLs.

Different organizations have different needs regarding networking. Some might be using Kubernetes, some other might
be using a cloud provider solution, and this trait allows adapting Distributed DataFusion to the different scenarios.

## [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/channel_resolver.rs)

Optional extension trait that allows to customize how connections are established to workers. Given one of the
URLs returned by the `WorkerResolver`, it builds an Arrow Flight client ready for serving queries.

## [NetworkBoundary](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/network_boundary.rs)

A network boundary is a node that, instead of pulling data from its children by executing them, serializes them
and sends them over the wire so that they are executed on a remote worker.

As a user of distributed DataFusion, you will not need to interact with this trait directly, but you should know
that different implementations of this trait will be injected into your plans so that queries get distributed.
108 changes: 53 additions & 55 deletions docs/source/user-guide/getting-started.md
Original file line number Diff line number Diff line change
@@ -1,88 +1,86 @@
# Getting Started

Rather than being opinionated about your setup and how you serve queries to users,
Distributed DataFusion allows you to plug in your own networking stack and spawn your own gRPC servers that act as
workers in the cluster.
Think of this library as vanilla DataFusion, except that certain nodes execute their children on remote
machines and retrieve data via the Arrow Flight protocol.

This library aims to provide an experience as close as possible to vanilla DataFusion.

## How to use Distributed DataFusion

Rather than imposing constraints on your infrastructure or query serving patterns, Distributed DataFusion
allows you to plug in your own networking stack and spawn your own gRPC servers that act as workers in the cluster.

This project heavily relies on the [Tonic](https://github.com/hyperium/tonic) ecosystem for the networking layer.
Users of this library are responsible for building their own Tonic server, adding the Arrow Flight distributed
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster.
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster. A very
basic example of this would be:

For a basic setup, all you need to do is to enrich your DataFusion `SessionStateBuilder` with the tools this project
ships:
```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let worker = Worker::default();

```rs
let state = SessionStateBuilder::new()
+ .with_distributed_worker_resolver(my_custom_worker_resolver)
+ .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.build();
```
Server::builder()
.add_service(worker.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

And the `my_custom_worker_resolver` variable should be an implementation of
the [WorkerResolverResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)
trait, which tells Distributed DataFusion how to connect to other workers in the cluster.
Ok(())
}
```

A very basic example of such an implementation that resolves workers in the localhost machine is:
Distributed DataFusion requires knowledge of worker locations. Implement the `WorkerResolver` trait to provide
this information. Here is a simple example of what this would look like with localhost workers:

```rust
#[derive(Clone)]
struct LocalhostWorkerResolver {
ports: Vec<u16>,
}

#[async_trait]
impl ChannelResolver for LocalhostChannelResolver {
impl WorkerResolver for LocalhostWorkerResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
Ok(self.ports.iter().map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()).collect())
Ok(self
.ports
.iter()
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
.collect())
}
}
```

> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.

This `WorkerResolver` implementation should resolve URLs of Distributed DataFusion workers, and it's
also the user of this library's responsibility to spawn a Tonic server that exposes the worker as an Arrow Flight
service using Tonic.
Register both the `WorkerResolver` implementation and the `DistributedPhysicalOptimizerRule` in DataFusion's
`SessionStateBuilder` to enable distributed query planning:

A basic example of such a server is:

```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let endpoint = Worker::default();

Server::builder()
.add_service(endpoint.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

Ok(())
```rs
let localhost_worker_resolver = LocalhostWorkerResolver {
ports: vec![8000, 8001, 8002]
}
```

## Next steps
let state = SessionStateBuilder::new()
.with_distributed_worker_resolver(localhost_worker_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.build();

The next two sections of this guide will walk you through tailoring the library's traits to your own needs:
let ctx = SessionContext::from(state);
```

- [Build your own WorkerResolver](worker-resolver.md)
- [Build your own ChannelResolver](channel-resolver.md)
- [Build your own TaskEstimator](task-estimator.md)
- [Build your own distributed DataFusion Worker](worker.md)
This will leave a DataFusion `SessionContext` ready for executing distributed queries.

Here are some other resources in the codebase:
> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.

- [In-memory cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/in_memory.md)
- [Localhost cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/localhost.md)
## Next steps

A more advanced example can be found in the benchmarks that use a cluster of distributed DataFusion workers
deployed on AWS EC2 machines:
Depending on your needs, your setup can get more complicated, for example:

- [AWS EC2 based cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs)
- You may want to resolve worker URLs dynamically using the Kubernetes API.
- You may want to wrap the Arrow Flight clients that connect workers with an observability layer.
- You may want to be able to execute your own custom ExecutionPlans in a distributed manner.
- etc...

Each feature in the project is showcased and tested in its own isolated integration test, so it's recommended to
review those for a better understanding of how specific features work:
To learn how to do all that, it's recommended to:

- [Pass your own ConfigExtension implementations across network boundaries](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_config_extension.rs)
- [Provide custom protobuf codecs for your own nodes](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_extension_codec.rs)
- Provide a custom TaskEstimator for controlling the amount of parallelism (coming soon)
- [Continue reading this guide](worker.md)
- [Look at examples in the project](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/examples)
- [Look at the integration tests for finer grained examples](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/tests)

29 changes: 16 additions & 13 deletions docs/source/user-guide/how-a-distributed-plan-is-built.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# How a Distributed Plan is Built

This page walks through the steps the distributed DataFusion planner takes to build a distributed query plan.
This page walks through how the distributed DataFusion planner transforms a query into a distributed execution plan.

Everything starts with a simple single-node plan, for example:

Expand Down Expand Up @@ -29,11 +29,14 @@ The first step is to split the leaf node into different tasks:
Each task will handle a different non-overlapping piece of data.

The number of tasks that will be used for executing leaf nodes is determined by a `TaskEstimator` implementation.
There is one default implementation for a file-based `DataSourceExec`, but since a `DataSourceExec` in DataFusion can
be anything the user wants to implement, it is the user who should also provide a custom `TaskEstimator` if they have
a custom `DataSourceExec`.
A default implementation exists for file-based `DataSourceExec` nodes. However, since `DataSourceExec` can be
customized to represent any data source, users with custom implementations should also provide a corresponding
`TaskEstimator`.

In the case above, a `TaskEstimator` decided to use four tasks for the leaf node.
In the case above, a `TaskEstimator` decided to use four tasks for the leaf node. Note that even if we are distributing
the data across different tasks, each task will also distribute its data across partitions using the vanilla DataFusion
partitioning mechanism. A partition is a split of data processed by a single thread on a single machine, whereas a
task is a split of data processed by an entire machine within a cluster.

After that, we can continue reconstructing the plan:

Expand All @@ -46,12 +49,11 @@ Let's keep constructing the plan:

![img_4.png](../_static/images/img_4.png)

Things change at this point: a `RepartitionExec` implies that we want to repartition data so that each partition handles
a non-overlapping set of the grouping keys of the ongoing aggregation.
At this point, the plan encounters a `RepartitionExec` node, which requires repartitioning data so each partition
handles a non-overlapping subset of grouping keys for the aggregation.

If a `RepartitionExec` in a single-node context redistributes data across threads on the same machine, a
`RepartitionExec` in a distributed context redistributes data across threads on different machines. This means that we
need to perform a shuffle over the network.
While `RepartitionExec` redistributes data across threads on a single machine in vanilla DataFusion, it redistributes
data across threads on different machines in the distributed context—requiring a network shuffle.

As we are about to send data over the network, it's convenient to coalesce smaller batches into larger ones to avoid
the overhead of sending many small messages, and instead send fewer but larger messages:
Expand Down Expand Up @@ -80,9 +82,10 @@ The rest of the plan can be formed as normal:

![img_7.png](../_static/images/img_7.png)

There's just one last step: the head of the plan is currently spread across two different machines, but we want it
on one. In the same way that vanilla DataFusion coalesces all partitions into one in the head node for the user, we also
need to do that, but not only across partitions on a single machine, but across tasks on different machines.
One final step remains: the plan's head is currently distributed across two machines, but the final result must be
consolidated on a single node. In the same way that vanilla DataFusion coalesces all partitions into one in the head
node for the user, we also need to do that, but not only across partitions on a single machine, but across tasks on
different machines.

For that, the `NetworkCoalesceExec` network boundary is introduced: it coalesces P partitions across N tasks into
N*P partitions in one task. This does not imply repartitioning, or shuffling, or anything like that. The partitions
Expand Down
Loading