diff --git a/docs/source/_static/images/concepts.png b/docs/source/_static/images/concepts.png new file mode 100644 index 00000000..60c89f0c --- /dev/null +++ b/docs/source/_static/images/concepts.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3714f57b3815db2004a1f6445298256b3f63cac9882e5186f901357e688fcbee +size 262763 diff --git a/docs/source/index.rst b/docs/source/index.rst index 1cea0739..ea80de49 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -13,13 +13,12 @@ how to contribute changes to the library yourself. :maxdepth: 2 :caption: User Guide - user-guide/index + user-guide/concepts user-guide/getting-started + user-guide/worker user-guide/worker-resolver - user-guide/arrow-flight-endpoint user-guide/channel-resolver user-guide/task-estimator - user-guide/concepts user-guide/how-a-distributed-plan-is-built .. _toc.contributor-guide: diff --git a/docs/source/user-guide/channel-resolver.md b/docs/source/user-guide/channel-resolver.md index c984459b..749cabf0 100644 --- a/docs/source/user-guide/channel-resolver.md +++ b/docs/source/user-guide/channel-resolver.md @@ -1,23 +1,23 @@ # Building a ChannelResolver -This trait is optional, there's a sane default already in place that should work for most simple use cases. +This trait is optional—a sensible default implementation exists that handles most use cases. -A `ChannelResolver` tells Distributed DataFusion how to build an Arrow Flight client baked by a -[tonic](https://github.com/hyperium/tonic) channel given a worker URL. +The `ChannelResolver` trait controls how Distributed DataFusion builds Arrow Flight clients backed by +[Tonic](https://github.com/hyperium/tonic) channels for worker URLs. -There's a default implementation that simply connects to the given URL, builds the Arrow Flight client instance, -and caches it so that the same client instance gets reused for a query to the same URL. +The default implementation connects to each URL, builds an Arrow Flight client, and caches it for reuse on +subsequent requests to the same URL. -However, you might want to provide your own implementation. For that you need to take into account the following -points: +## Providing your own ChannelResolver + +For providing your own implementation, you'll need to take into account the following points: - You will need to provide your own implementation in two places: - - in the `SessionContext` that handles your queries. + - in the `SessionContext` that first initiates and plans your queries. - while instantiating the `Worker` with the `from_session_builder()` constructor. -- If you decide to build it from scratch, make sure that Arrow Flight clients are reused across - request rather than always building new ones. -- You can use this library's `DefaultChannelResolver` as a backbone for your own implementation. - If you do that, channel caching will be automatically managed for you. +- If building from scratch, ensure Arrow Flight clients are reused across requests rather than recreated each time. +- You can extend `DefaultChannelResolver` as a foundation for custom implementations. This automatically handles + gRPC channel reuse. ```rust #[derive(Clone)] @@ -25,15 +25,19 @@ struct CustomChannelResolver; #[async_trait] impl ChannelResolver for CustomChannelResolver { - async fn get_flight_client_for_url(&self, url: &Url) -> Result, DataFusionError> { - // Build a custom FlightServiceClient wrapped with tower layers or something similar. + async fn get_flight_client_for_url( + &self, + url: &Url, + ) -> Result, DataFusionError> { + // Build a custom FlightServiceClient wrapped with tower + // layers or something similar. todo!() } } async fn main() { - // Make sure you build just one for the whole lifetime of your application, as it needs to be able to - // reuse Arrow Flight client instances across different queries. + // Build a single instance for your application's lifetime + // to enable Arrow Flight client reuse across queries. let channel_resolver = CustomChannelResolver; let state = SessionStateBuilder::new() diff --git a/docs/source/user-guide/concepts.md b/docs/source/user-guide/concepts.md index d5e81007..88cbd0aa 100644 --- a/docs/source/user-guide/concepts.md +++ b/docs/source/user-guide/concepts.md @@ -1,25 +1,50 @@ # Concepts -This library contains a set of extensions to DataFusion that allow you to run distributed queries. +This library is a collection of DataFusion extensions that enable distributed query execution. You can think of +it as normal DataFusion, with the addition that some nodes are capable of streaming data over the network using +Arrow Flight instead of through in-memory communication. -These are some terms you should be familiar with before getting started: +Key terminology: - `Stage`: a portion of the plan separated by a network boundary from other parts of the plan. A plan contains - one or more stages. + one or more stages, each separated by network boundaries. - `Task`: a unit of work in a stage that executes the inner plan in parallel to other tasks within the stage. Each task - in a stage is executed by a different worker. -- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. -- `Network boundary`: a node in the plan that streams data from a network interface rather than directly from its - children. Implemented as DataFusion `ExecutionPlan`s: `NeworkShuffle` and `NetworkCoalesce`. -- `Subplan`: a slice of the overall plan. Each stage will execute a subplan of the overall plan. + in a stage executes a structurally identical plan in different worker, passing a `task_index` as a contextual value + for making choices about what data should be returned. +- `Network Boundary`: a node in the plan that streams data from a network interface rather than directly from its + children nodes. +- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. A task is + executed by exactly one worker, but one worker executes many tasks concurrently. + +![concepts.png](../_static/images/concepts.png) + +You'll see these concepts mentioned extensively across the documentation and the code itself. + +# Public API + +Some other more tangible concepts are the structs and traits exposed publicly, the most important are: ## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs) -This is a physical optimizer rule that converts a single-node DataFusion query into a distributed query. It reads +A physical optimizer rule that transforms single-node DataFusion query plans into distributed query plans. It reads a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion. -It builds the distributed plan from bottom to top, and based on the present nodes in the original plan, -it will inject network boundaries in the appropriate places. +It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on +the nodes present in the original plan. + +## [Worker](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/flight_service/worker.rs) + +Arrow Flight server implementation that integrates with the Tonic ecosystem and listens to serialized plans that get +executed over the wire. + +Users are expected to build these and spawn them in ports so that the network boundary nodes can reach them. + +## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs) + +Determines the available workers in the Distributed DataFusion cluster by returning their URLs. + +Different organizations have different networking requirements—from Kubernetes deployments to cloud provider +solutions. This trait allows Distributed DataFusion to adapt to various scenarios. ## [TaskEstimator](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/task_estimator.rs) @@ -41,22 +66,7 @@ you are in, you might want to return a different set of data. For example, if you are on the task with index 0 of a 3-task stage, you might want to return only the first 1/3 of the data. If you are on the task with index 2, you might want to return the last 1/3 of the data, and so on. -## [WorkerResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs) - -Establishes the number of workers available in the distributed DataFusion cluster by returning their URLs. - -Different organizations have different needs regarding networking. Some might be using Kubernetes, some other might -be using a cloud provider solution, and this trait allows adapting Distributed DataFusion to the different scenarios. - ## [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/channel_resolver.rs) Optional extension trait that allows to customize how connections are established to workers. Given one of the URLs returned by the `WorkerResolver`, it builds an Arrow Flight client ready for serving queries. - -## [NetworkBoundary](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/network_boundary.rs) - -A network boundary is a node that, instead of pulling data from its children by executing them, serializes them -and sends them over the wire so that they are executed on a remote worker. - -As a user of distributed DataFusion, you will not need to interact with this trait directly, but you should know -that different implementations of this trait will be injected into your plans so that queries get distributed. diff --git a/docs/source/user-guide/getting-started.md b/docs/source/user-guide/getting-started.md index 0682180c..33f5fa02 100644 --- a/docs/source/user-guide/getting-started.md +++ b/docs/source/user-guide/getting-started.md @@ -1,28 +1,36 @@ # Getting Started -Rather than being opinionated about your setup and how you serve queries to users, -Distributed DataFusion allows you to plug in your own networking stack and spawn your own gRPC servers that act as -workers in the cluster. +Think of this library as vanilla DataFusion, except that certain nodes execute their children on remote +machines and retrieve data via the Arrow Flight protocol. + +This library aims to provide an experience as close as possible to vanilla DataFusion. + +## How to use Distributed DataFusion + +Rather than imposing constraints on your infrastructure or query serving patterns, Distributed DataFusion +allows you to plug in your own networking stack and spawn your own gRPC servers that act as workers in the cluster. This project heavily relies on the [Tonic](https://github.com/hyperium/tonic) ecosystem for the networking layer. Users of this library are responsible for building their own Tonic server, adding the Arrow Flight distributed -DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster. +DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster. A very +basic example of this would be: -For a basic setup, all you need to do is to enrich your DataFusion `SessionStateBuilder` with the tools this project -ships: +```rust +#[tokio::main] +async fn main() -> Result<(), Box> { + let worker = Worker::default(); -```rs - let state = SessionStateBuilder::new() -+ .with_distributed_worker_resolver(my_custom_worker_resolver) -+ .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) - .build(); -``` + Server::builder() + .add_service(worker.into_flight_server()) + .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)) + .await?; -And the `my_custom_worker_resolver` variable should be an implementation of -the [WorkerResolverResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/worker_resolver.rs) -trait, which tells Distributed DataFusion how to connect to other workers in the cluster. + Ok(()) +} +``` -A very basic example of such an implementation that resolves workers in the localhost machine is: +Distributed DataFusion requires knowledge of worker locations. Implement the `WorkerResolver` trait to provide +this information. Here is a simple example of what this would look like with localhost workers: ```rust #[derive(Clone)] @@ -30,59 +38,49 @@ struct LocalhostWorkerResolver { ports: Vec, } -#[async_trait] -impl ChannelResolver for LocalhostChannelResolver { +impl WorkerResolver for LocalhostWorkerResolver { fn get_urls(&self) -> Result, DataFusionError> { - Ok(self.ports.iter().map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()).collect()) + Ok(self + .ports + .iter() + .map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap()) + .collect()) } } ``` -> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library. - -This `WorkerResolver` implementation should resolve URLs of Distributed DataFusion workers, and it's -also the user of this library's responsibility to spawn a Tonic server that exposes the worker as an Arrow Flight -service using Tonic. +Register both the `WorkerResolver` implementation and the `DistributedPhysicalOptimizerRule` in DataFusion's +`SessionStateBuilder` to enable distributed query planning: -A basic example of such a server is: - -```rust -#[tokio::main] -async fn main() -> Result<(), Box> { - let endpoint = Worker::default(); - - Server::builder() - .add_service(endpoint.into_flight_server()) - .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)) - .await?; - - Ok(()) +```rs +let localhost_worker_resolver = LocalhostWorkerResolver { + ports: vec![8000, 8001, 8002] } -``` -## Next steps +let state = SessionStateBuilder::new() + .with_distributed_worker_resolver(localhost_worker_resolver) + .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .build(); -The next two sections of this guide will walk you through tailoring the library's traits to your own needs: +let ctx = SessionContext::from(state); +``` -- [Build your own WorkerResolver](worker-resolver.md) -- [Build your own ChannelResolver](channel-resolver.md) -- [Build your own TaskEstimator](task-estimator.md) -- [Build your own distributed DataFusion Worker](worker.md) +This will leave a DataFusion `SessionContext` ready for executing distributed queries. -Here are some other resources in the codebase: +> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library. -- [In-memory cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/in_memory.md) -- [Localhost cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/localhost.md) +## Next steps -A more advanced example can be found in the benchmarks that use a cluster of distributed DataFusion workers -deployed on AWS EC2 machines: +Depending on your needs, your setup can get more complicated, for example: -- [AWS EC2 based cluster example](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs) +- You may want to resolve worker URLs dynamically using the Kubernetes API. +- You may want to wrap the Arrow Flight clients that connect workers with an observability layer. +- You may want to be able to execute your own custom ExecutionPlans in a distributed manner. +- etc... -Each feature in the project is showcased and tested in its own isolated integration test, so it's recommended to -review those for a better understanding of how specific features work: +To learn how to do all that, it's recommended to: -- [Pass your own ConfigExtension implementations across network boundaries](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_config_extension.rs) -- [Provide custom protobuf codecs for your own nodes](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/custom_extension_codec.rs) -- Provide a custom TaskEstimator for controlling the amount of parallelism (coming soon) +- [Continue reading this guide](worker.md) +- [Look at examples in the project](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/examples) +- [Look at the integration tests for finer grained examples](https://github.com/datafusion-contrib/datafusion-distributed/tree/main/tests) diff --git a/docs/source/user-guide/how-a-distributed-plan-is-built.md b/docs/source/user-guide/how-a-distributed-plan-is-built.md index c872fd53..7b1efbd5 100644 --- a/docs/source/user-guide/how-a-distributed-plan-is-built.md +++ b/docs/source/user-guide/how-a-distributed-plan-is-built.md @@ -1,6 +1,6 @@ # How a Distributed Plan is Built -This page walks through the steps the distributed DataFusion planner takes to build a distributed query plan. +This page walks through how the distributed DataFusion planner transforms a query into a distributed execution plan. Everything starts with a simple single-node plan, for example: @@ -29,11 +29,14 @@ The first step is to split the leaf node into different tasks: Each task will handle a different non-overlapping piece of data. The number of tasks that will be used for executing leaf nodes is determined by a `TaskEstimator` implementation. -There is one default implementation for a file-based `DataSourceExec`, but since a `DataSourceExec` in DataFusion can -be anything the user wants to implement, it is the user who should also provide a custom `TaskEstimator` if they have -a custom `DataSourceExec`. +A default implementation exists for file-based `DataSourceExec` nodes. However, since `DataSourceExec` can be +customized to represent any data source, users with custom implementations should also provide a corresponding +`TaskEstimator`. -In the case above, a `TaskEstimator` decided to use four tasks for the leaf node. +In the case above, a `TaskEstimator` decided to use four tasks for the leaf node. Note that even if we are distributing +the data across different tasks, each task will also distribute its data across partitions using the vanilla DataFusion +partitioning mechanism. A partition is a split of data processed by a single thread on a single machine, whereas a +task is a split of data processed by an entire machine within a cluster. After that, we can continue reconstructing the plan: @@ -46,12 +49,11 @@ Let's keep constructing the plan: ![img_4.png](../_static/images/img_4.png) -Things change at this point: a `RepartitionExec` implies that we want to repartition data so that each partition handles -a non-overlapping set of the grouping keys of the ongoing aggregation. +At this point, the plan encounters a `RepartitionExec` node, which requires repartitioning data so each partition +handles a non-overlapping subset of grouping keys for the aggregation. -If a `RepartitionExec` in a single-node context redistributes data across threads on the same machine, a -`RepartitionExec` in a distributed context redistributes data across threads on different machines. This means that we -need to perform a shuffle over the network. +While `RepartitionExec` redistributes data across threads on a single machine in vanilla DataFusion, it redistributes +data across threads on different machines in the distributed context—requiring a network shuffle. As we are about to send data over the network, it's convenient to coalesce smaller batches into larger ones to avoid the overhead of sending many small messages, and instead send fewer but larger messages: @@ -80,9 +82,10 @@ The rest of the plan can be formed as normal: ![img_7.png](../_static/images/img_7.png) -There's just one last step: the head of the plan is currently spread across two different machines, but we want it -on one. In the same way that vanilla DataFusion coalesces all partitions into one in the head node for the user, we also -need to do that, but not only across partitions on a single machine, but across tasks on different machines. +One final step remains: the plan's head is currently distributed across two machines, but the final result must be +consolidated on a single node. In the same way that vanilla DataFusion coalesces all partitions into one in the head +node for the user, we also need to do that, but not only across partitions on a single machine, but across tasks on +different machines. For that, the `NetworkCoalesceExec` network boundary is introduced: it coalesces P partitions across N tasks into N*P partitions in one task. This does not imply repartitioning, or shuffling, or anything like that. The partitions diff --git a/docs/source/user-guide/index.md b/docs/source/user-guide/index.md deleted file mode 100644 index f4d4022d..00000000 --- a/docs/source/user-guide/index.md +++ /dev/null @@ -1,16 +0,0 @@ -# Index - -Distributed DataFusion is a library that brings distributed capabilities to DataFusion. -It provides a set of execution plans, optimization rules, configuration extensions, and new traits -to enable distributed execution. - -This user guide will walk you through using the tools in this project to set up -your own distributed DataFusion cluster. - -- [Concepts](concepts.md) -- [Getting Started](getting-started.md) -- [Building a WorkerResolver](worker-resolver.md) -- [Spawning a Worker](worker.md) -- [Building a ChannelResolver](channel-resolver.md) -- [Building a TaskEstimator](task-estimator.md) -- [How a distributed plan is built](how-a-distributed-plan-is-built.md) diff --git a/docs/source/user-guide/task-estimator.md b/docs/source/user-guide/task-estimator.md index 77eaacb1..b2d42f4d 100644 --- a/docs/source/user-guide/task-estimator.md +++ b/docs/source/user-guide/task-estimator.md @@ -1,15 +1,14 @@ # Building a TaskEstimator -A `TaskEstimator` is a trait that tells the distributed planner how many distributed tasks should be used in the -different stages of the plan. +The `TaskEstimator` trait controls how many distributed tasks the planner allocates to each stage of the query plan. -The number of tasks is assigned to the different stages in a bottom-to-top fashion. You can refer the +The number of tasks is assigned to the different stages in a bottom-up fashion. You can refer to the [Plan Annotation docs](https://github.com/datafusion-contrib/datafusion-distributed/blob/75b4e73e9052c6596b9d1744ce2bdfa6cbc010d3/src/distributed_planner/plan_annotator.rs) for an explanation on how this works. A `TaskEstimator` is what hints this process how many tasks should be used. -There is a default implementation that acts on `DataSourceExec` nodes baked by `FileScanConfig`s, however, as a user, -you may want to provide your own `TaskEstimator` implementation for your own `ExecutionPlan`s. +While a default implementation exists for file-based `DataSourceExec` nodes (those backed by `FileScanConfig`), you +can provide custom `TaskEstimator` implementations for your own `ExecutionPlan` types. ## Providing your own TaskEstimator @@ -19,12 +18,13 @@ Providing a `TaskEstimator` allows you to do two things: 2. Tell the distributed planner how to "scale up" your `ExecutionPlan` in order to account for it running in multiple distributed tasks. -Note that if your custom nodes are going to run distributed, you need to account for it at execution time. -If you build a `TaskEstimator` that tells the distributed planner that your node should run in N tasks, then -you need to react to the presence of a -[DistributedTaskCtx](https://github.com/datafusion-contrib/datafusion-distributed/blob/75b4e73e9052c6596b9d1744ce2bdfa6cbc010d3/src/stage.rs#L137-L137) -during execution. +If your custom nodes will execute in a distributed manner, you must handle this during execution. When your +`TaskEstimator` specifies N tasks for a node, your execution logic must respond to the +[DistributedTaskContext](https://github.com/datafusion-contrib/datafusion-distributed/blob/75b4e73e9052c6596b9d1744ce2bdfa6cbc010d3/src/stage.rs#L137-L137) +present in DataFusion's `TaskContext` to determine which subset of data this task should process. There's an example of how to do that in the `examples/` folder: -- TODO: link to example +- [custom_execution_plan.rs](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/custom_execution_plan.rs) - + A complete example showing how to implement a custom execution plan (`numbers(start, end)` table function) + that works with distributed DataFusion, including a custom codec and TaskEstimator. diff --git a/docs/source/user-guide/worker-resolver.md b/docs/source/user-guide/worker-resolver.md index 8af97362..a17e5915 100644 --- a/docs/source/user-guide/worker-resolver.md +++ b/docs/source/user-guide/worker-resolver.md @@ -1,13 +1,12 @@ -# Building a WorkerResolver +# Implementing a WorkerResolver -A `WorkerResolver` tells distributed DataFusion the location (URLs) of your worker nodes. This information is -used in two different places: +The `WorkerResolver` trait provides Distributed DataFusion with the locations (URLs) of your worker nodes. This +information is used in two different places: -1. For planning: during distributed planning, the amount of worker nodes available (essentially, `Vec`.length()), - is used for determined how to scale up the plan. The distributed planner will not use more tasks per stage than the - amount of workers the cluster has. -2. Right before execution: each task in a distributed plan contains slots that the `DistributedExec` node needs - to fill right before execution (with the URLs of the workers as updated as possible). +1. **During planning**: The number of available workers (i.e., `Vec.len()`) determines how the plan scales. + The planner will not allocate more tasks per stage than available workers. +2. **Before execution**: Each task in a distributed plan needs its worker URL assignment populated right before + execution. You need to pass your own `WorkerResolver` to DataFusion's `SessionStateBuilder` so that it's available in the `SesionContext`: @@ -30,11 +29,12 @@ async fn main() { } ``` -> NOTE: It's not necessary to pass this to the Worker session builder. +> NOTE: It's not necessary to pass a WorkerResolver to the Worker session builder, it's just necessary on the +> SessionState that initiates and plans the query. ## Static WorkerResolver -This is the simplest thing you can do, although it will not fit some common use cases. An example of this can be +This is the simplest approach, though it doesn't accommodate dynamic worker discovery. An example of this can be seen in the [localhost_worker.rs](https://github.com/datafusion-contrib/datafusion-distributed/blob/fad9fa222d65b7d2ddae92fbc20082b5c434e4ff/examples/localhost_run.rs) example: @@ -64,9 +64,10 @@ provider for hosting your Distributed DataFusion workers. It's up to you to decide how the URLs should be resolved. One important implementation note is: -- Retrieving the worker URLs needs to be synchronous (as planning is synchronous), and therefore, you'll likely - need to spawn background tasks that periodically refresh the list of available URLs. +- Since planning is synchronous, `get_urls()` must return immediately. For dynamic worker discovery, spawn + background tasks that periodically refresh the worker URL list, storing results that `get_urls()` can access + synchronously. A good example can be found -in https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs, +in [benchmarks/cdk/bin/worker.rs](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs), where a cluster of AWS EC2 machines is discovered identified by tags with the AWS Rust SDK. \ No newline at end of file diff --git a/docs/source/user-guide/worker.md b/docs/source/user-guide/worker.md index 573d36c4..e57bc329 100644 --- a/docs/source/user-guide/worker.md +++ b/docs/source/user-guide/worker.md @@ -1,7 +1,7 @@ # Spawn a Worker -A `Worker` is a gRPC server that implements the Arrow Flight protocol for distributed query execution. -Worker nodes run these endpoints to receive execution plans, execute them, and stream results back. +The `Worker` is a gRPC server implementing the Arrow Flight protocol for distributed query execution. Worker nodes +run these endpoints to receive execution plans, execute them, and stream results back. ## Overview @@ -14,7 +14,7 @@ The `Worker` is the core worker component in Distributed DataFusion. It: ## Launching the Arrow Flight server -The `Default` implementation of the `Worker` should satisfy most basic use cases +The default `Worker` implementation satisfies most basic use cases: ```rust use datafusion_distributed::Worker; @@ -31,8 +31,7 @@ async fn main() { } ``` -If you are using DataFusion, though, it's very likely that you have your own custom UDFs, execution nodes, config -options, etc... +However, most DataFusion deployments include custom UDFs, execution nodes, or configuration options. You'll need to tell the `Worker` how to build your DataFusion sessions: @@ -56,7 +55,7 @@ async fn main() { } ``` -### WorkerSessionBuilder +## WorkerSessionBuilder The `WorkerSessionBuilder` is a closure or type that implements: @@ -73,7 +72,8 @@ pub trait WorkerSessionBuilder { It receives a `WorkerQueryContext` containing: - `SessionStateBuilder`: A pre-populated session state builder in which you can inject your custom stuff -- `headers`: HTTP headers from the incoming request (useful for passing metadata) +- `headers`: HTTP headers from the incoming request (useful for passing metadata like authentication tokens or + configuration) ## Serving the Endpoint @@ -94,5 +94,4 @@ async fn main() { } ``` -The `into_flight_server()` method wraps the endpoint in a `FlightServiceServer` with high message size limits ( -`usize::MAX`) to avoid chunking overhead for internal communication. +The `into_flight_server()` method builds a `FlightServiceServer` ready to be added as a Tonic service. diff --git a/examples/custom_execution_plan.md b/examples/custom_execution_plan.md new file mode 100644 index 00000000..49f2366a --- /dev/null +++ b/examples/custom_execution_plan.md @@ -0,0 +1,148 @@ +# Custom Execution Plan Example + +Demonstrates how to create a distributed custom execution plan with a `numbers(start, end)` table function. + +## Components + +**NumbersTableFunction** – Table function callable in SQL: `SELECT * FROM numbers(1, 100)` + +**NumbersExec** – Execution plan with `ranges_per_task: Vec>` storing one range per task. +Uses `DistributedTaskContext` to determine which range to generate. + +**NumbersExecCodec** – Protobuf-based serialization implementing `PhysicalExtensionCodec`. +Must be registered in the `SessionStateBuilder` that initiates the query as well as the one used by `Worker`s. + +**NumbersTaskEstimator** – Controls distributed parallelism: + +- `task_estimation()` - Returns how many tasks needed based on range size and config +- `scale_up_leaf_node()` - Splits single range of numbers into N per-task ranges + +**NumbersConfig** – Custom config extension for controlling distributed parallelism (`numbers_per_task: usize`) + +## Usage + +This example imports the `InMemoryWorkerResolver` and the `InMemoryChannelResolver` used during integration testing +of this project, so it needs the `--features integration` flag on. + +This example demonstrates how the bigger the range of numbers is queried, the more tasks are used in executing +the query, for example: + +```bash +cargo run \ + --features integration \ + --example custom_execution_plan \ + "SELECT DISTINCT number FROM numbers(0, 10) ORDER BY number" \ + --show-distributed-plan +``` + +``` +SortPreservingMergeExec: [number@0 ASC NULLS LAST] + SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] + AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([number@0], 16), input_partitions=16 + AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[] + RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 + CooperativeExec + NumbersExec: t0:[0-10) +``` + +This will print a non-distributed plan, as the range of numbers we are querying (`numbers(0, 10)`) is small. + +The config parameter `numbers.numbers_per_task` is the one that controls how many distributed tasks are used in the +query, and it's default value is `10`, so querying 10 numbers will not distribute the plan. + +However, if we try to query 11 numbers: + +```bash +cargo run \ + --features integration \ + --example custom_execution_plan \ + "SELECT DISTINCT number FROM numbers(0, 11) ORDER BY number" \ + --show-distributed-plan +``` + +``` +┌───── DistributedExec ── Tasks: t0:[p0] +│ SortPreservingMergeExec: [number@0 ASC NULLS LAST] +│ [Stage 2] => NetworkCoalesceExec: output_partitions=32, input_tasks=2 +└────────────────────────────────────────────────── + ┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] + │ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] + │ AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[] + │ [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=2 + └────────────────────────────────────────────────── + ┌───── Stage 1 ── Tasks: t0:[p0..p31] t1:[p0..p31] + │ CoalesceBatchesExec: target_batch_size=8192 + │ RepartitionExec: partitioning=Hash([number@0], 32), input_partitions=16 + │ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[] + │ RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 + │ CooperativeExec + │ NumbersExec: t0:[0-6), t1:[6-11) + └────────────────────────────────────────────────── +``` + +The distribution rule kicks in, and the plan gets distributed. + +Note that the parallelism in the plan has an upper threshold, so for example, if we query 100 numbers: + +```bash +cargo run \ + --features integration \ + --example custom_execution_plan \ + "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \ + --show-distributed-plan +``` + +``` +┌───── DistributedExec ── Tasks: t0:[p0] +│ SortPreservingMergeExec: [number@0 ASC NULLS LAST] +│ [Stage 2] => NetworkCoalesceExec: output_partitions=48, input_tasks=3 +└────────────────────────────────────────────────── + ┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15] + │ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] + │ AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[] + │ [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=4 + └────────────────────────────────────────────────── + ┌───── Stage 1 ── Tasks: t0:[p0..p47] t1:[p0..p47] t2:[p0..p47] t3:[p0..p47] + │ CoalesceBatchesExec: target_batch_size=8192 + │ RepartitionExec: partitioning=Hash([number@0], 48), input_partitions=16 + │ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[] + │ RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 + │ CooperativeExec + │ NumbersExec: t0:[0-25), t1:[25-50), t2:[50-75), t3:[75-100) + └────────────────────────────────────────────────── +``` + +We do not get 100/10 = 10 distributed tasks, we just get 4. This is because the example is configured by default to +simulate a 4-worker cluster. If we increase the worker count, we get a highly distributed plan out with 10 tasks: + +```bash +cargo run \ + --features integration \ + --example custom_execution_plan \ + "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \ + --workers 10 \ + --show-distributed-plan +``` + +``` +┌───── DistributedExec ── Tasks: t0:[p0] +│ SortPreservingMergeExec: [number@0 ASC NULLS LAST] +│ [Stage 2] => NetworkCoalesceExec: output_partitions=112, input_tasks=7 +└────────────────────────────────────────────────── + ┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15] t3:[p0..p15] t4:[p0..p15] t5:[p0..p15] t6:[p0..p15] + │ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] + │ AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[] + │ [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=10 + └────────────────────────────────────────────────── + ┌───── Stage 1 ── Tasks: t0:[p0..p111] t1:[p0..p111] t2:[p0..p111] t3:[p0..p111] t4:[p0..p111] t5:[p0..p111] t6:[p0..p111] t7:[p0..p111] t8:[p0..p111] t9:[p0..p111] + │ CoalesceBatchesExec: target_batch_size=8192 + │ RepartitionExec: partitioning=Hash([number@0], 112), input_partitions=16 + │ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[] + │ RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 + │ CooperativeExec + │ NumbersExec: t0:[0-10), t1:[10-20), t2:[20-30), t3:[30-40), t4:[40-50), t5:[50-60), t6:[60-70), t7:[70-80), t8:[80-90), t9:[90-100) + └────────────────────────────────────────────────── +``` + diff --git a/examples/custom_execution_plan.rs b/examples/custom_execution_plan.rs new file mode 100644 index 00000000..10c16d28 --- /dev/null +++ b/examples/custom_execution_plan.rs @@ -0,0 +1,408 @@ +//! This example demonstrates how to create a custom execution plan that works with +//! Distributed DataFusion. It implements a `numbers(start, end)` table function that +//! generates a sequence of numbers and can be distributed across multiple workers. +//! +//! This example includes: +//! - Custom TableFunction for accepting the `numbers(start, end)` in SQL +//! - Custom TableProvider for mapping the table function to an execution plan +//! - Custom ExecutionPlan for returning the requested number range +//! - Custom PhysicalExtensionCodec for serialization across the network +//! - Custom TaskEstimator to control parallelism +//! +//! Run this example with: +//! ```bash +//! cargo run --features integration --example custom_execution_plan "SELECT DISTINCT number FROM numbers(0, 10) ORDER BY number" --show-distributed-plan +//! cargo run --features integration --example custom_execution_plan "SELECT DISTINCT number FROM numbers(0, 11) ORDER BY number" --show-distributed-plan +//! cargo run --features integration --example custom_execution_plan "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" --show-distributed-plan +//! cargo run --features integration --example custom_execution_plan "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" --workers 10 --show-distributed-plan +//! ``` + +use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatchOptions; +use arrow::util::pretty::pretty_format_batches; +use async_trait::async_trait; +use datafusion::catalog::{Session, TableFunctionImpl}; +use datafusion::common::{ + DataFusionError, Result, ScalarValue, exec_err, extensions_options, internal_err, plan_err, +}; +use datafusion::config::ConfigExtension; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder, TaskContext}; +use datafusion::logical_expr::Expr; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_distributed::test_utils::in_memory_channel_resolver::{ + InMemoryChannelResolver, InMemoryWorkerResolver, +}; +use datafusion_distributed::{ + DistributedExt, DistributedPhysicalOptimizerRule, DistributedTaskContext, TaskEstimation, + TaskEstimator, WorkerQueryContext, display_plan_ascii, +}; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use datafusion_proto::protobuf; +use datafusion_proto::protobuf::proto_error; +use futures::{TryStreamExt, stream}; +use prost::Message; +use std::any::Any; +use std::fmt::{self, Formatter}; +use std::ops::Range; +use std::sync::Arc; +use structopt::StructOpt; + +/// Table function that generates a sequence of numbers from start to end. +/// Can be called in SQL as: SELECT * FROM numbers(start, end) +#[derive(Debug)] +struct NumbersTableFunction; + +impl TableFunctionImpl for NumbersTableFunction { + fn call(&self, exprs: &[Expr]) -> Result> { + if exprs.len() != 2 { + return plan_err!( + "numbers() requires exactly 2 arguments (start, end), got {}", + exprs.len() + ); + } + fn get_number(expr: &Expr) -> Result { + match &expr { + Expr::Literal(ScalarValue::Int64(Some(v)), _) => Ok(*v), + Expr::Literal(ScalarValue::Int32(Some(v)), _) => Ok(*v as i64), + v => plan_err!("numbers() arguments must be integer literals, got {v:?}"), + } + } + Ok(Arc::new(NumbersTableProvider { + start: get_number(&exprs[0])?, + end: get_number(&exprs[1])?, + })) + } +} + +fn numbers_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new( + "number", + DataType::Int64, + false, + )])) +} + +/// TableProvider that generates a sequence of numbers from start to end. +#[derive(Debug)] +struct NumbersTableProvider { + start: i64, + end: i64, +} + +#[async_trait] +impl TableProvider for NumbersTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + numbers_schema() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let schema = match projection { + Some(indices) => Arc::new(self.schema().project(indices)?), + None => self.schema(), + }; + + #[allow(clippy::single_range_in_vec_init)] + Ok(Arc::new(NumbersExec::new([self.start..self.end], schema))) + } +} + +/// Custom execution plan that generates numbers from start to end. +/// When distributed, each task generates a subset of numbers based on its task_index. +#[derive(Debug, Clone)] +struct NumbersExec { + ranges_per_task: Vec>, + plan_properties: PlanProperties, +} + +impl NumbersExec { + fn new(ranges_per_task: impl IntoIterator>, schema: SchemaRef) -> Self { + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + datafusion::physical_expr::Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + ranges_per_task: ranges_per_task.into_iter().collect(), + plan_properties, + } + } +} + +impl DisplayAs for NumbersExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "NumbersExec: ")?; + for (task_i, range) in self.ranges_per_task.iter().enumerate() { + write!(f, "t{task_i}:[{}-{})", range.start, range.end)?; + if task_i < self.ranges_per_task.len() - 1 { + write!(f, ", ")?; + } + } + Ok(()) + } +} + +impl ExecutionPlan for NumbersExec { + fn name(&self) -> &str { + "NumbersExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + context: Arc, + ) -> Result { + // Get the distributed task context to determine which subset of numbers + // this task should generate + let dist_ctx = DistributedTaskContext::from_ctx(&context); + + let Some(range) = self.ranges_per_task.get(dist_ctx.task_index) else { + return exec_err!("Task index out of range"); + }; + + // Calculate which numbers this task should generate + let numbers: Vec = range.clone().collect(); + let row_count = numbers.len(); + + // Create batch matching the schema (may be empty for COUNT queries) + let batch = if self.schema().fields().is_empty() { + // For COUNT queries, return batch with correct row count but no columns + let mut options = RecordBatchOptions::new(); + options.row_count = Some(row_count); + RecordBatch::try_new_with_options(self.schema(), vec![], &options)? + } else { + let array: ArrayRef = Arc::new(Int64Array::from(numbers)); + RecordBatch::try_new(self.schema(), vec![array])? + }; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + stream::once(async { Ok(batch) }), + ))) + } +} + +/// Custom codec for serializing/deserializing NumbersExec across the network. As the NumbersExec +/// plan will be sent over the wire during distributed queries, both the SessionContext that +/// initiates the query and each Worker need to know how to (de)serialize it. +#[derive(Debug)] +struct NumbersExecCodec; + +#[derive(Clone, PartialEq, ::prost::Message)] +struct NumbersExecProto { + #[prost(message, optional, tag = "1")] + schema: Option, + #[prost(repeated, message, tag = "2")] + ranges: Vec, +} + +#[derive(Clone, PartialEq, ::prost::Message)] +struct RangeProto { + #[prost(int64, tag = "1")] + start: i64, + #[prost(int64, tag = "2")] + end: i64, +} + +impl PhysicalExtensionCodec for NumbersExecCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + _ctx: &TaskContext, + ) -> Result> { + if !inputs.is_empty() { + return internal_err!("NumbersExec should have no children, got {}", inputs.len()); + } + + let proto = NumbersExecProto::decode(buf) + .map_err(|e| proto_error(format!("Failed to decode NumbersExec: {e}")))?; + + let schema: Schema = proto + .schema + .as_ref() + .map(|s| s.try_into()) + .ok_or(proto_error("NetworkShuffleExec is missing schema"))??; + + Ok(Arc::new(NumbersExec::new( + proto.ranges.iter().map(|v| v.start..v.end), + Arc::new(schema), + ))) + } + + fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + let Some(exec) = node.as_any().downcast_ref::() else { + return internal_err!("Expected plan to be NumbersExec, but was {}", node.name()); + }; + + let proto = NumbersExecProto { + schema: Some(node.schema().try_into()?), + ranges: exec + .ranges_per_task + .iter() + .map(|v| RangeProto { + start: v.start, + end: v.end, + }) + .collect(), + }; + + proto + .encode(buf) + .map_err(|e| proto_error(format!("Failed to encode NumbersExec: {e}"))) + } +} + +extensions_options! { + /// Custom ConfigExtension for configuring NumbersExec distributed task estimation behavior + /// at runtime with SET statements. + struct NumbersConfig { + /// how many numbers each task will produce + numbers_per_task: usize, default = 10 + } +} + +impl ConfigExtension for NumbersConfig { + const PREFIX: &'static str = "numbers"; +} + +/// Custom TaskEstimator that tells the planner how to distribute NumbersExec. +#[derive(Debug)] +struct NumbersTaskEstimator; + +impl TaskEstimator for NumbersTaskEstimator { + fn task_estimation( + &self, + plan: &Arc, + cfg: &datafusion::config::ConfigOptions, + ) -> Option { + let plan = plan.as_any().downcast_ref::()?; + let cfg: &NumbersConfig = cfg.extensions.get()?; + let task_count = (plan.ranges_per_task[0].end - plan.ranges_per_task[0].start) as f64 + / cfg.numbers_per_task as f64; + + Some(TaskEstimation::desired(task_count.ceil() as usize)) + } + + fn scale_up_leaf_node( + &self, + plan: &Arc, + task_count: usize, + _cfg: &datafusion::config::ConfigOptions, + ) -> Option> { + let plan = plan.as_any().downcast_ref::()?; + let range = &plan.ranges_per_task[0]; + let chunk_size = ((range.end - range.start) as f64 / task_count as f64).ceil() as i64; + + let ranges_per_task = (0..task_count).map(|i| { + let start = range.start + (i as i64 * chunk_size); + let end = (start + chunk_size).min(range.end); + start..end + }); + + Some(Arc::new(NumbersExec::new(ranges_per_task, plan.schema()))) + } +} + +#[derive(StructOpt)] +#[structopt( + name = "custom_execution_plan", + about = "Example demonstrating custom execution plans with Distributed DataFusion" +)] +struct Args { + /// The SQL query to run. + #[structopt()] + query: String, + + /// Number of distributed workers to simulate. + #[structopt(long, default_value = "4")] + workers: usize, + + /// Whether the distributed plan should be rendered instead of executing the query. + #[structopt(long)] + show_distributed_plan: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::from_args(); + + let worker_resolver = InMemoryWorkerResolver::new(args.workers); + let channel_resolver = + InMemoryChannelResolver::from_session_builder(|ctx: WorkerQueryContext| async move { + Ok(ctx + .builder + .with_distributed_user_codec(NumbersExecCodec) + .build()) + }); + + let config = SessionConfig::new().with_option_extension(NumbersConfig::default()); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .with_distributed_worker_resolver(worker_resolver) + .with_distributed_channel_resolver(channel_resolver) + .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_user_codec(NumbersExecCodec) + .with_distributed_task_estimator(NumbersTaskEstimator) + .build(); + + let ctx = SessionContext::from(state); + ctx.register_udtf("numbers", Arc::new(NumbersTableFunction)); + + let mut df = None; + for query in args.query.split(';') { + df = Some(ctx.sql(query).await?); + } + let df = df.unwrap(); + if args.show_distributed_plan { + let plan = df.create_physical_plan().await?; + println!("{}", display_plan_ascii(plan.as_ref(), false)); + } else { + let stream = df.execute_stream().await?; + let batches = stream.try_collect::>().await?; + let formatted = pretty_format_batches(&batches)?; + println!("{formatted}"); + } + Ok(()) +}