|
| 1 | +# DataFusion Distributed |
| 2 | + |
| 3 | +Library that brings distributed execution capabilities to [DataFusion](https://github.com/apache/datafusion). |
| 4 | + |
| 5 | +> [!WARNING] |
| 6 | +> This project is currently under construction and is not yet ready for production use. |
| 7 | +
|
| 8 | +## What can you do with this crate? |
| 9 | + |
| 10 | +This crate is a toolkit that extends [DataFusion](https://github.com/apache/datafusion) with distributed capabilities, |
| 11 | +providing a developer experience as close as possible to vanilla DataFusion and being opinionated about the networking |
| 12 | +stack used for hosting the different workers involved in a query. |
| 13 | + |
| 14 | +Users of this library can expect to take their existing single-node DataFusion-based systems and add distributed |
| 15 | +capabilities with minimal changes. |
| 16 | + |
| 17 | +## Core tenets of the project |
| 18 | + |
| 19 | +- Be as close as possible to vanilla DataFusion, providing a seamless integration with existing DataFusion systems and |
| 20 | + a familiar API for building applications. |
| 21 | +- Unopinionated networking. This crate does not take any opinion about the networking stack, and users are expected |
| 22 | + to leverage their own infrastructure for hosting DataFusion nodes. |
| 23 | +- No coordinator-worker architecture. To keep infrastructure simple, any node can act as a coordinator or a worker. |
| 24 | + |
| 25 | +## Architecture |
| 26 | + |
| 27 | +A distributed DataFusion query is executed in a very similar fashion as a normal DataFusion query with one key |
| 28 | +difference: |
| 29 | + |
| 30 | +The physical plan is divided into stages that can be executed in different machines, and exchange data using Arrow |
| 31 | +Flight. All of this is done at the physical plan level, and is implemented as a `PhysicalOptimizerRule` that: |
| 32 | + |
| 33 | +1. Inspects the non-distributed plan, placing network boundaries (`ArrowFlightReadExec` nodes) in the appropriate places |
| 34 | +2. Based on the placed network boundaries, divides the plan into stages and assigns tasks to them. |
| 35 | + |
| 36 | +For example, imagine we have a plan that looks like this: |
| 37 | + |
| 38 | +``` |
| 39 | + ┌──────────────────────┐ |
| 40 | + │ ProjectionExec │ |
| 41 | + └──────────────────────┘ |
| 42 | + ▲ |
| 43 | + ┌──────────┴───────────┐ |
| 44 | + │ AggregateExec │ |
| 45 | + │ (final) │ |
| 46 | + └──────────────────────┘ |
| 47 | + ▲ |
| 48 | + ┌──────────┴───────────┐ |
| 49 | + │ RepartionExec │ |
| 50 | + │ (3 input partitions) │ |
| 51 | + └──────────────────────┘ |
| 52 | + ▲ |
| 53 | + ┌──────────┴───────────┐ |
| 54 | + │ AggregateExec │ |
| 55 | + │ (partial) │ |
| 56 | + └──────────────────────┘ |
| 57 | + ▲ |
| 58 | + ┌──────────┴───────────┐ |
| 59 | + │ DataSourceExec │ |
| 60 | + └──────────────────────┘ |
| 61 | +``` |
| 62 | + |
| 63 | +We want to distribute the aggregation to something like this: |
| 64 | + |
| 65 | +``` |
| 66 | + ┌──────────────────────┐ |
| 67 | + │ ProjectionExec │ |
| 68 | + └──────────────────────┘ |
| 69 | + ▲ |
| 70 | + ┌──────────┴───────────┐ |
| 71 | + │ AggregateExec │ |
| 72 | + │ (final) │ |
| 73 | + └──────────────────────┘ |
| 74 | + ▲ ▲ ▲ |
| 75 | + ┌────────────────────────┘ │ └─────────────────────────┐ |
| 76 | + ┌──────────┴───────────┐ ┌──────────┴───────────┐ ┌───────────┴──────────┐ |
| 77 | + │ AggregateExec │ │ AggregateExec │ │ AggregateExec │ |
| 78 | + │ (partial) │ │ (partial) │ │ (partial) │ |
| 79 | + └──────────────────────┘ └──────────────────────┘ └──────────────────────┘ |
| 80 | + ▲ ▲ ▲ |
| 81 | + ┌──────────┴───────────┐ ┌──────────┴───────────┐ ┌───────────┴──────────┐ |
| 82 | + │ DataSourceExec │ │ DataSourceExec │ │ DataSourceExec │ |
| 83 | + └──────────────────────┘ └──────────────────────┘ └──────────────────────┘ |
| 84 | +``` |
| 85 | + |
| 86 | +The first step is to place the `ArrowFlightReadExec` network boundary in the appropriate place (the following drawing |
| 87 | +shows the partitioning scheme in each node): |
| 88 | + |
| 89 | +``` |
| 90 | + ┌──────────────────────┐ |
| 91 | + │ ProjectionExec │ |
| 92 | + └─────────[0]──────────┘ |
| 93 | + ▲ |
| 94 | + ┌──────────┴───────────┐ |
| 95 | + │ AggregateExec │ |
| 96 | + │ (final) │ |
| 97 | + └─────────[0]──────────┘ |
| 98 | + ▲ |
| 99 | + ┌──────────┴───────────┐ |
| 100 | + │ ArrowFlightRead │ <- this node was injected to tell the distributed planner |
| 101 | + │ (3 input tasks) │ that there must be a network boundary here. |
| 102 | + └──────[0][1][2]───────┘ |
| 103 | + ▲ ▲ ▲ |
| 104 | + ┌───────┴──┴──┴────────┐ |
| 105 | + │ AggregateExec │ |
| 106 | + │ (partial) │ |
| 107 | + └──────[0][1][2]───────┘ |
| 108 | + ▲ ▲ ▲ |
| 109 | + ┌───────┴──┴──┴────────┐ |
| 110 | + │ DataSourceExec │ |
| 111 | + └──────[0][1][2]───────┘ |
| 112 | +``` |
| 113 | + |
| 114 | +Based on that boundary, the plan is divided into stages and tasks are assigned to each stage: |
| 115 | + |
| 116 | +``` |
| 117 | + ┌────── (stage 2) ───────┐ |
| 118 | + │┌──────────────────────┐│ |
| 119 | + ││ ProjectionExec ││ |
| 120 | + │└──────────┬───────────┘│ |
| 121 | + │┌──────────┴───────────┐│ |
| 122 | + ││ AggregateExec ││ |
| 123 | + ││ (final) ││ |
| 124 | + │└──────────┬───────────┘│ |
| 125 | + │┌──────────┴───────────┐│ |
| 126 | + ││ ArrowFlightReadExec ││ |
| 127 | + │└──────[0][1][2]───────┘│ |
| 128 | + └─────────▲─▲─▲──────────┘ |
| 129 | + ┌────────────────────────┘ │ └─────────────────────────┐ |
| 130 | + │ │ │ |
| 131 | + ┌─── task 0 (stage 1) ───┐ ┌── task 1 (stage 1) ────┐ ┌── task 2 (stage 1) ────┐ |
| 132 | + │ │ │ │ │ │ │ │ │ |
| 133 | + │┌─────────[0]──────────┐│ │┌─────────[0]──────────┐│ │┌──────────[0]─────────┐│ |
| 134 | + ││ AggregateExec ││ ││ AggregateExec ││ ││ AggregateExec ││ |
| 135 | + ││ (partial) ││ ││ (partial) ││ ││ (partial) ││ |
| 136 | + │└──────────┬───────────┘│ │└──────────┬───────────┘│ │└───────────┬──────────┘│ |
| 137 | + │┌─────────[0]──────────┐│ │┌─────────[0]──────────┐│ │┌──────────[0]─────────┐│ |
| 138 | + ││ DataSourceExec ││ ││ DataSourceExec ││ ││ DataSourceExec ││ |
| 139 | + │└──────────────────────┘│ │└──────────────────────┘│ │└──────────────────────┘│ |
| 140 | + └────────────────────────┘ └────────────────────────┘ └────────────────────────┘ |
| 141 | +``` |
| 142 | + |
| 143 | +The plan is immediately executable, and the same process that planned the distributed query can start executing the head |
| 144 | +stage (stage 2). The `ArrowFlightReadExec` in that stage will know from which tasks to gather data from stage 1, and |
| 145 | +will issue 3 concurrent Arrow Flight requests to the appropriate physical nodes. |
| 146 | + |
| 147 | +This means that: |
| 148 | + |
| 149 | +1. The head stage is executed normally as if the query was not distributed. |
| 150 | +2. Upon calling `.execute()` on the `ArrowFlightReadExec`, instead of recursively calling `.execute()` on its children, |
| 151 | + they will be serialized and sent over the wire to another node. |
| 152 | +3. The next node, which is hosting an Arrow Flight Endpoint listening for gRPC requests over an HTTP server, will pick up |
| 153 | + the request containing the serialized chunk of the overall plan, and execute it. |
| 154 | +4. This is repeated for each stage, and data will start flowing from bottom to top until it reaches the head stage. |
| 155 | + |
0 commit comments