diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..8f4ef8f --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright: 2025 The Apache Software Foundation. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md index e69de29..f97e48e 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,260 @@ +# DataFusion Distributed + +Library that brings distributed execution capabilities to [DataFusion](https://github.com/apache/datafusion). + +> [!WARNING] +> This project is currently under construction and is not yet ready for production use. + +## What can you do with this crate? + +This crate is a toolkit that extends [DataFusion](https://github.com/apache/datafusion) with distributed capabilities, +providing a developer experience as close as possible to vanilla DataFusion while being unopinionated about the +networking stack used for hosting the different workers involved in a query. + +Users of this library can expect to take their existing single-node DataFusion-based systems and add distributed +capabilities with minimal changes. + +## Core tenets of the project + +- Be as close as possible to vanilla DataFusion, providing a seamless integration with existing DataFusion systems and + a familiar API for building applications. +- Unopinionated about networking. This crate does not take any opinion about the networking stack, and users are + expected to leverage their own infrastructure for hosting DataFusion nodes. +- No coordinator-worker architecture. To keep infrastructure simple, any node can act as a coordinator or a worker. + +## Architecture + +Before diving into the architecture, it's important to clarify some terms and what they mean: + +- `worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. +- `network boundary`: a node in the plan that streams data from a network interface rather than directly from its + children. Implemented as an `ArrowFlightReadExec` physical DataFusion node. +- `stage`: a portion of the plan separated by a network boundary from other parts of the plan. Implemented as any + other physical node in DataFusion. +- `task`: a unit of work inside a stage that executes a subset of its partitions in a specific worker. +- `subplan`: a slice of the overall plan + +A distributed DataFusion query is executed in a very similar fashion as a normal DataFusion query with one key +difference: + +The physical plan is divided into stages, each stage is assigned tasks that run in parallel in different workers. All of +this is done at the physical plan level, and is implemented as a `PhysicalOptimizerRule` that: + +1. Inspects the non-distributed plan, placing network boundaries (`ArrowFlightReadExec` nodes) in the appropriate places +2. Based on the placed network boundaries, divides the plan into stages and assigns tasks to them. + +For example, imagine we have a plan that looks like this: + +``` + ┌──────────────────────┐ + │ ProjectionExec │ + └──────────────────────┘ + ▲ + ┌──────────┴───────────┐ + │ AggregateExec │ + │ (final) │ + └──────────────────────┘ + ▲ + ┌──────────┴───────────┐ + │ RepartionExec │ + │ (3 input partitions) │ + └──────────────────────┘ + ▲ + ┌──────────┴───────────┐ + │ AggregateExec │ + │ (partial) │ + └──────────────────────┘ + ▲ + ┌──────────┴───────────┐ + │ DataSourceExec │ + └──────────────────────┘ +``` + +We want to distribute the aggregation to something like this: + +``` + ┌──────────────────────┐ + │ ProjectionExec │ + └──────────────────────┘ + ▲ + ┌──────────┴───────────┐ + │ AggregateExec │ + │ (final) │ + └──────────────────────┘ + ▲ ▲ ▲ + ┌────────────────────────┘ │ └─────────────────────────┐ + ┌──────────┴───────────┐ ┌──────────┴───────────┐ ┌───────────┴──────────┐ + │ AggregateExec │ │ AggregateExec │ │ AggregateExec │ + │ (partial) │ │ (partial) │ │ (partial) │ + └──────────────────────┘ └──────────────────────┘ └──────────────────────┘ + ▲ ▲ ▲ + ┌──────────┴───────────┐ ┌──────────┴───────────┐ ┌───────────┴──────────┐ + │ DataSourceExec │ │ DataSourceExec │ │ DataSourceExec │ + └──────────────────────┘ └──────────────────────┘ └──────────────────────┘ +``` + +The first step is to place the `ArrowFlightReadExec` network boundary in the appropriate place (the following drawing +shows the partitioning scheme in each node): + +``` + ┌──────────────────────┐ + │ ProjectionExec │ + └─────────[0]──────────┘ + ▲ + ┌──────────┴───────────┐ + │ AggregateExec │ + │ (final) │ + └─────────[0]──────────┘ + ▲ + ┌──────────┴───────────┐ + │ ArrowFlightRead │ <- this node was injected to tell the distributed planner + │ (3 input tasks) │ that there must be a network boundary here. + └──────[0][1][2]───────┘ + ▲ ▲ ▲ + ┌───────┴──┴──┴────────┐ + │ AggregateExec │ + │ (partial) │ + └──────[0][1][2]───────┘ + ▲ ▲ ▲ + ┌───────┴──┴──┴────────┐ + │ DataSourceExec │ + └──────[0][1][2]───────┘ +``` + +Based on that boundary, the plan is divided into stages, and tasks are assigned to each stage. Each task will be +responsible for the different partitions in the original plan. + +``` + ┌────── (stage 2) ───────┐ + │┌──────────────────────┐│ + ││ ProjectionExec ││ + │└──────────┬───────────┘│ + │┌──────────┴───────────┐│ + ││ AggregateExec ││ + ││ (final) ││ + │└──────────┬───────────┘│ + │┌──────────┴───────────┐│ + ││ ArrowFlightReadExec ││ + │└──────[0][1][2]───────┘│ + └─────────▲─▲─▲──────────┘ + ┌────────────────────────┘ │ └─────────────────────────┐ + │ │ │ + ┌─── task 0 (stage 1) ───┐ ┌── task 1 (stage 1) ────┐ ┌── task 2 (stage 1) ────┐ + │ │ │ │ │ │ │ │ │ + │┌─────────[0]──────────┐│ │┌─────────[1]──────────┐│ │┌──────────[2]─────────┐│ + ││ AggregateExec ││ ││ AggregateExec ││ ││ AggregateExec ││ + ││ (partial) ││ ││ (partial) ││ ││ (partial) ││ + │└──────────┬───────────┘│ │└──────────┬───────────┘│ │└───────────┬──────────┘│ + │┌─────────[0]──────────┐│ │┌─────────[1]──────────┐│ │┌──────────[2]─────────┐│ + ││ DataSourceExec ││ ││ DataSourceExec ││ ││ DataSourceExec ││ + │└──────────────────────┘│ │└──────────────────────┘│ │└──────────────────────┘│ + └────────────────────────┘ └────────────────────────┘ └────────────────────────┘ +``` + +The plan is immediately executable, and the same process that planned the distributed query can start executing the head +stage (stage 2). The `ArrowFlightReadExec` in that stage will know from which tasks to gather data from stage 1, and +will issue 3 concurrent Arrow Flight requests to the appropriate physical nodes. + +This means that: + +1. The head stage is executed normally as if the query was not distributed. +2. Upon calling `.execute()` on `ArrowFlightReadExec`, instead of propagating the `.execute()` call on its child, + the subplan is serialized and sent over the wire to be executed on another worker. +3. The next node, which is hosting an Arrow Flight Endpoint listening for gRPC requests over an HTTP server, will pick + up the request containing the serialized chunk of the overall plan and execute it. +4. This is repeated for each stage, and data will start flowing from bottom to top until it reaches the head stage. + +## Getting familiar with distributed DataFusion + +There are some runnable examples showcasing how to provide a localhost implementation for Distributed DataFusion in +[examples/](examples): + +- [localhost_worker.rs](examples/localhost_worker.rs): code that spawns an Arrow Flight Endpoint listening for physical + plans over the network. +- [localhost_run.rs](examples/localhost_run.rs): code that distributes a query across the spawned Arrow Flight Endpoints + and executes it. + +The integration tests also provide an idea about how to use the library and what can be achieved with it: + +- [tpch_validation_test.rs](tests/tpch_validation_test.rs): executes all TPCH queries and performs assertions over the + distributed plans and the results vs running the queries in single node mode with a small scale factor. +- [custom_config_extension.rs](tests/custom_config_extension.rs): showcases how to propagate custom DataFusion config + extensions. +- [custom_extension_codec.rs](tests/custom_extension_codec.rs): showcases how to propagate custom physical extension + codecs. +- [distributed_aggregation.rs](tests/distributed_aggregation.rs): showcases how to manually place `ArrowFlightReadExec` + nodes in a plan and build a distributed query out of it. + +## Development + +### Prerequisites + +- Rust 1.85.1 or later (specified in `rust-toolchain.toml`) +- Git LFS for test data + +### Setup + +1. **Clone the repository:** + ```bash + git clone git@github.com:datafusion-contrib/datafusion-distributed + cd datafusion-distributed + ``` + +2. **Install Git LFS and fetch test data:** + ```bash + git lfs install + git lfs checkout + ``` + +### Running Tests + +**Unit and integration tests:** + +```bash +cargo test --features integration +``` + +### Running Examples + +**Start localhost workers:** + +```bash +# Terminal 1 +cargo run --example localhost_worker -- 8080 --cluster-ports 8080,8081 + +# Terminal 2 +cargo run --example localhost_worker -- 8081 --cluster-ports 8080,8081 +``` + +**Execute distributed queries:** + +```bash +cargo run --example localhost_run -- 'SELECT count(*) FROM weather' --cluster-ports 8080,8081 +``` + +### Benchmarks + +**Generate TPC-H benchmark data:** + +```bash +cd benchmarks +./gen-tpch.sh +``` + +**Run TPC-H benchmarks:** + +```bash +cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1 +``` + +### Project Structure + +- `src/` - Core library code + - `flight_service/` - Arrow Flight service implementation + - `plan/` - Physical plan extensions and operators + - `stage/` - Execution stage management + - `common/` - Shared utilities +- `examples/` - Usage examples +- `tests/` - Integration tests +- `benchmarks/` - Performance benchmarks +- `testdata/` - Test datasets