Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/source/_static/images/concepts.png
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is NetworkCoalesceExec at the top not considered a stage? Do stage indices start at 0 or 1? It could possibly be helpful to make that explicit by writing e.g. Stage i where i is the stage index rather than just stage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is NetworkCoalesceExec at the top not considered a stage?

I don't think so. That part of the plan gets executed locally in a non-distributed context, so we usually talk about it as the "head" of the plan. It might be nice to choose a specific term for that though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For stage enumeration, I'd not over index to much about it in this drawing, as I wanted to land it as clean as possible. There's not a strong enumeration order requirement, fwiw stage numbers could be just random numbers and it would work the same, so I think it's fine to not represent it in this drawing

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 2 additions & 3 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ how to contribute changes to the library yourself.
:maxdepth: 2
:caption: User Guide

user-guide/index
user-guide/concepts
user-guide/getting-started
user-guide/worker
user-guide/worker-resolver
user-guide/arrow-flight-endpoint
user-guide/channel-resolver
user-guide/task-estimator
user-guide/concepts
user-guide/how-a-distributed-plan-is-built

.. _toc.contributor-guide:
Expand Down
28 changes: 17 additions & 11 deletions docs/source/user-guide/channel-resolver.md
Original file line number Diff line number Diff line change
@@ -1,39 +1,45 @@
# Building a ChannelResolver

This trait is optional, there's a sane default already in place that should work for most simple use cases.
This trait is optional, there's a sane default already in place that should work for most simple cases.

A `ChannelResolver` tells Distributed DataFusion how to build an Arrow Flight client baked by a
[tonic](https://github.com/hyperium/tonic) channel given a worker URL.
[Tonic](https://github.com/hyperium/tonic) channel given a worker URL.

There's a default implementation that simply connects to the given URL, builds the Arrow Flight client instance,
and caches it so that the same client instance gets reused for a query to the same URL.
and caches it so that it gets reused upon reaching the URL again.

However, you might want to provide your own implementation. For that you need to take into account the following
points:
## Providing your own ChannelResolver

For providing your own implementation, you'll need to take into account the following points:

- You will need to provide your own implementation in two places:
- in the `SessionContext` that handles your queries.
- in the `SessionContext` that first initiates and plans your queries.
- while instantiating the `Worker` with the `from_session_builder()` constructor.
- If you decide to build it from scratch, make sure that Arrow Flight clients are reused across
request rather than always building new ones.
- You can use this library's `DefaultChannelResolver` as a backbone for your own implementation.
If you do that, channel caching will be automatically managed for you.
If you do that, gRPC channel reuse will be automatically managed for you.

```rust
#[derive(Clone)]
struct CustomChannelResolver;

#[async_trait]
impl ChannelResolver for CustomChannelResolver {
async fn get_flight_client_for_url(&self, url: &Url) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
// Build a custom FlightServiceClient wrapped with tower layers or something similar.
async fn get_flight_client_for_url(
&self,
url: &Url,
) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
// Build a custom FlightServiceClient wrapped with tower
// layers or something similar.
todo!()
}
}

async fn main() {
// Make sure you build just one for the whole lifetime of your application, as it needs to be able to
// reuse Arrow Flight client instances across different queries.
// Make sure you build just one for the whole lifetime of your application,
// as it needs to be able to reuse Arrow Flight client instances across
// different queries.
let channel_resolver = CustomChannelResolver;

let state = SessionStateBuilder::new()
Expand Down
49 changes: 29 additions & 20 deletions docs/source/user-guide/concepts.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
# Concepts

This library contains a set of extensions to DataFusion that allow you to run distributed queries.
This library is a collection of DataFusion extensions that allows running distributed queries. You can think of
it as normal DataFusion, with the addition that some nodes are capable of streaming data over the network using
Arrow Flight instead of through in-memory communication.

These are some terms you should be familiar with before getting started:

- `Stage`: a portion of the plan separated by a network boundary from other parts of the plan. A plan contains
one or more stages.
- `Task`: a unit of work in a stage that executes the inner plan in parallel to other tasks within the stage. Each task
in a stage is executed by a different worker.
- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface.
- `Network boundary`: a node in the plan that streams data from a network interface rather than directly from its
children. Implemented as DataFusion `ExecutionPlan`s: `NeworkShuffle` and `NetworkCoalesce`.
- `Subplan`: a slice of the overall plan. Each stage will execute a subplan of the overall plan.
- `Network Boundary`: a node in the plan that streams data from a network interface rather than directly from its
children.
- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. A task is
executed by exactly one worker, but one worker executes many tasks concurrently.

![concepts.png](../_static/images/concepts.png)

You'll see these concepts mentioned extensively across the documentation and the code itself.

# Public API

Some other more tangible concepts are the structs and traits exposed publicly, the most important are:

## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs)

Expand All @@ -21,6 +31,20 @@ a fully formed physical plan and injects the appropriate nodes to execute the qu
It builds the distributed plan from bottom to top, and based on the present nodes in the original plan,
it will inject network boundaries in the appropriate places.

## [Worker](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/flight_service/worker.rs)

Arrow Flight server implementation that integrates with the Tonic ecosystem and listens to serialized plans that get
executed over the wire.

Users are expected to build these and spawn them in ports so that the network boundary nodes can reach them.

## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)

Establishes the number of workers available in the distributed DataFusion cluster by returning their URLs.

Different organizations have different needs regarding networking. Some might be using Kubernetes, some other might
be using a cloud provider solution, and this trait allows adapting Distributed DataFusion to the different scenarios.

## [TaskEstimator](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/task_estimator.rs)

Estimates the number of tasks required in the leaf stage of a distributed query.
Expand All @@ -41,22 +65,7 @@ you are in, you might want to return a different set of data.
For example, if you are on the task with index 0 of a 3-task stage, you might want to return only the first 1/3 of the
data. If you are on the task with index 2, you might want to return the last 1/3 of the data, and so on.

## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)

Establishes the number of workers available in the distributed DataFusion cluster by returning their URLs.

Different organizations have different needs regarding networking. Some might be using Kubernetes, some other might
be using a cloud provider solution, and this trait allows adapting Distributed DataFusion to the different scenarios.

## [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/channel_resolver.rs)

Optional extension trait that allows to customize how connections are established to workers. Given one of the
URLs returned by the `WorkerResolver`, it builds an Arrow Flight client ready for serving queries.

## [NetworkBoundary](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/network_boundary.rs)

A network boundary is a node that, instead of pulling data from its children by executing them, serializes them
and sends them over the wire so that they are executed on a remote worker.

As a user of distributed DataFusion, you will not need to interact with this trait directly, but you should know
that different implementations of this trait will be injected into your plans so that queries get distributed.
103 changes: 51 additions & 52 deletions docs/source/user-guide/getting-started.md
Original file line number Diff line number Diff line change
@@ -1,88 +1,87 @@
# Getting Started

The easiest way to think about this library is like vanilla DataFusion, with the exception that some nodes
happen to execute their children in remote machines getting data back through the Arrow Flight protocol.

This library aims to provide an experience as close as possible to vanilla DataFusion.

## How to use Distributed DataFusion

Rather than being opinionated about your setup and how you serve queries to users,
Distributed DataFusion allows you to plug in your own networking stack and spawn your own gRPC servers that act as
workers in the cluster.

This project heavily relies on the [Tonic](https://github.com/hyperium/tonic) ecosystem for the networking layer.
Users of this library are responsible for building their own Tonic server, adding the Arrow Flight distributed
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster.
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster. A very
basic example of this would be:

For a basic setup, all you need to do is to enrich your DataFusion `SessionStateBuilder` with the tools this project
ships:
```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let worker = Worker::default();

```rs
let state = SessionStateBuilder::new()
+ .with_distributed_worker_resolver(my_custom_worker_resolver)
+ .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.build();
```
Server::builder()
.add_service(worker.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

And the `my_custom_worker_resolver` variable should be an implementation of
the [WorkerResolverResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs)
trait, which tells Distributed DataFusion how to connect to other workers in the cluster.
Ok(())
}
```

A very basic example of such an implementation that resolves workers in the localhost machine is:
Distributed DataFusion needs to know how to reach other workers, so users are expected to implement the `WorkerResolver`
trait for this. Here is a simple example of what this would look like with localhost workers:

```rust
#[derive(Clone)]
struct LocalhostWorkerResolver {
ports: Vec<u16>,
}

#[async_trait]
impl ChannelResolver for LocalhostChannelResolver {
impl ChannelResolver for LocalhostWorkerResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
Ok(self.ports.iter().map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()).collect())
Ok(self
.ports
.iter()
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
.collect())
}
}
```

> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.

This `WorkerResolver` implementation should resolve URLs of Distributed DataFusion workers, and it's
also the user of this library's responsibility to spawn a Tonic server that exposes the worker as an Arrow Flight
service using Tonic.
The `WorkerResolver` implementation, along with the `DistributedPhysicalOptimizerRule`, needs to be provided in
DataFusion's `SessionStateBuilder` so that it's available during the planning step of distributed queries:

A basic example of such a server is:

```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let endpoint = Worker::default();

Server::builder()
.add_service(endpoint.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

Ok(())
```rs
let localhost_worker_resolver = LocalhostWorkerResolver {
ports: vec![8000, 8001, 8002]
}
```

## Next steps
let state = SessionStateBuilder::new()
.with_distributed_worker_resolver(localhost_worker_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.build();

The next two sections of this guide will walk you through tailoring the library's traits to your own needs:
let ctx = SessionContext::from(state);
```

- [Build your own WorkerResolver](worker-resolver.md)
- [Build your own ChannelResolver](channel-resolver.md)
- [Build your own TaskEstimator](task-estimator.md)
- [Build your own distributed DataFusion Worker](worker.md)
This will leave a DataFusion `SessionContext` ready for executing distributed queries.

Here are some other resources in the codebase:
> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.

- [In-memory cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/in_memory.md)
- [Localhost cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/localhost.md)
## Next steps

A more advanced example can be found in the benchmarks that use a cluster of distributed DataFusion workers
deployed on AWS EC2 machines:
Depending on your needs, your setup can get more complicated, for example:

- [AWS EC2 based cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs)
- You may want to resolve worker URLs dynamically using the Kubernetes API.
- You may want to wrap the Arrow Flight clients that connect workers with an observability layer.
- You may want to be able to execute your own custom ExecutionPlans in a distributed manner.
- etc...

Each feature in the project is showcased and tested in its own isolated integration test, so it's recommended to
review those for a better understanding of how specific features work:
To learn how to do all that, it's recommended to:

- [Pass your own ConfigExtension implementations across network boundaries](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_config_extension.rs)
- [Provide custom protobuf codecs for your own nodes](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_extension_codec.rs)
- Provide a custom TaskEstimator for controlling the amount of parallelism (coming soon)
- [Continue reading this guide](worker.md)
- [Look at examples in the project](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/examples)
- [Look at the integration tests for finer grained examples](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/tests)

16 changes: 0 additions & 16 deletions docs/source/user-guide/index.md

This file was deleted.

5 changes: 3 additions & 2 deletions docs/source/user-guide/worker-resolver.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ async fn main() {
}
```

> NOTE: It's not necessary to pass this to the Worker session builder.
> NOTE: It's not necessary to pass a WorkerResolver to the Worker session builder, it's just necessary on the
> SessionState that initiates and plans the query.

## Static WorkerResolver

Expand Down Expand Up @@ -68,5 +69,5 @@ It's up to you to decide how the URLs should be resolved. One important implemen
need to spawn background tasks that periodically refresh the list of available URLs.

A good example can be found
in https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs,
in [benchmarks/cdk/bin/worker.rs](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs),
where a cluster of AWS EC2 machines is discovered identified by tags with the AWS Rust SDK.
3 changes: 1 addition & 2 deletions docs/source/user-guide/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,4 @@ async fn main() {
}
```

The `into_flight_server()` method wraps the endpoint in a `FlightServiceServer` with high message size limits (
`usize::MAX`) to avoid chunking overhead for internal communication.
The `into_flight_server()` method builds a `FlightServiceServer` ready to be added as a Tonic service.